{"id":46742,"date":"2023-02-02T00:00:00","date_gmt":"2023-02-02T08:00:00","guid":{"rendered":"https:\/\/griddb-linux-hte8hndjf8cka8ht.westus-01.azurewebsites.net\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/"},"modified":"2025-11-13T12:56:25","modified_gmt":"2025-11-13T20:56:25","slug":"scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow","status":"publish","type":"post","link":"https:\/\/www.griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/","title":{"rendered":"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow"},"content":{"rendered":"<p>In a previous blog, we discussed migrating a <a href=\"https:\/\/www.kaggle.com\/datasets\/garystafford\/environmental-sensor-data-132k\">sample IoT dataset<\/a> from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB import\/export tools, walking through exactly how to use the tool, along with considering WHY a user may want to shift from using PostgreSQL over to GridDB. You can read about that process here: <a href=\"https:\/\/griddb.net\/en\/blog\/using-the-griddb-import-export-tools-to-migrate-from-postgresql-to-griddb\/\">Using the GridDB Import\/Export Tools to Migrate from PostgreSQL to GridDB<\/a><\/p>\n<p>In this article, we will again consider the idea of migrating from PostgreSQL, but rather than stick to the import\/export tools, we will be showcasing <a href=\"https:\/\/airflow.apache.org\/\">Apache Airflow<\/a>. If you are unfamilar with Airflow, it &#8220;is a platform to programmatically author, schedule, and monitor workflows&#8221;. To put Airflow&#8217;s description into simpler terms: Airflow allows for you to use python code to schedule workflows &#8212; these worksflows usually are broken up into smaller tasks which can be orchestrated to run in a sequence of your choosing.<\/p>\n<p>For this blog, we will be using Airflow to migrate the same dataset from our previous blog. And then once that feat is accomplished, we will also be scheduling a <a href=\"https:\/\/airflow.apache.org\/docs\/apache-airflow\/1.10.12\/concepts.html\">DAG ( Directed Acyclic Graph )<\/a> to periodically migrate new rows from PostgeSQL over to GridDB, ensuring that our two databases are always at parity (well, at least after our scheduled intervals).<\/p>\n<p>Before we get into the technical aspects of this project, let&#8217;s first get Airflow installed onto our machine, along with all prerequisites.<\/p>\n<h2>Preparing and Installation<\/h2>\n<p>As stated, this section will go over installing this project onto your machine. The nice thing about Airflow is that they provide docker images to make installing, sharing, and extending the installation an easy endeavor. This article works off of using Airflow in a docker container, but more on that later.<\/p>\n<h3>Prerequisites<\/h3>\n<p>To follow along you will need the following:<\/p>\n<ul>\n<li><a href=\"https:\/\/docs.docker.com\/get-docker\/\">Docker<\/a><\/li>\n<li><a href=\"https:\/\/docs.docker.com\/compose\/install\/\">Docker-compose<\/a><\/li>\n<\/ul>\n<p>That&#8217;s it! All other databases and libraries are installed via docker containers.<\/p>\n<h3>Grabbing Source Code<\/h3>\n<p>To grab all source code for this project, please clone the following repository: <a href=\"https:\/\/github.com\/griddbnet\/Blogs\/tree\/apache_airflow\">GitHub<\/a><\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$  git clone https:\/\/github.com\/griddbnet\/Blogs.git --branch apache_airflow<\/code><\/pre>\n<\/div>\n<p>Once it&#8217;s cloned, you will have a folder with all of the necessary docker files and code needed to begin.<\/p>\n<h3>The Docker Containers<\/h3>\n<p>We will first go through the Dockerfiles included inside the repo shared above and then finally we will touch on the <code>docker-compose<\/code> file before running our project.<\/p>\n<h4>Dockerfile for Extending the Airflow Image<\/h4>\n<p>To start, you will notice there is a file called <code>Dockerfile.airflow<\/code>. This file <a href=\"https:\/\/airflow.apache.org\/docs\/docker-stack\/build.html#quick-start-scenarios-of-image-extending\">extends<\/a> the original <code>apache\/airflow<\/code> image to add some of the Python libraries we need for our project.<\/p>\n<p>The file looks like so:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">FROM apache\/airflow:latest-python3.10\nCOPY requirements.txt \/requirements.txt\n\nUSER root\nRUN apt-get update \n  && apt-get install -y --no-install-recommends \n         default-jre  wget build-essential swig \n  && apt-get autoremove -yqq --purge \n  && apt-get clean \n  && rm -rf \/var\/lib\/apt\/lists\/*\n\nRUN wget https:\/\/repo1.maven.org\/maven2\/com\/github\/griddb\/gridstore-jdbc\/5.1.0\/gridstore-jdbc-5.1.0.jar -P \/usr\/share\/java\n\n# Install GridDB c_client\nWORKDIR \/\nRUN wget --no-check-certificate https:\/\/github.com\/griddb\/c_client\/releases\/download\/v5.0.0\/griddb-c-client_5.0.0_amd64.deb\nRUN dpkg -i griddb-c-client_5.0.0_amd64.deb\n\nUSER airflow\n\nRUN pip install --user --upgrade pip\nRUN pip install --no-cache-dir --user -r \/requirements.txt\nRUN pip install --no-cache-dir apache-airflow-providers-common-sql\nRUN pip install --no-cache-dir apache-airflow-providers-jdbc\nRUN pip install --no-cache-dir griddb-python\n\nENV JAVA_HOME=\/usr\/share\/java\/gridstore-jdbc-5.1.0.jar<\/code><\/pre>\n<\/div>\n<p>To start, for this project, we are installing JDBC and the GridDB Python client. We will not actually be using JDBC for the remainder of this project, but it is included because JDBC allows full use of SQL and may come in handy for future projects. More on connecting via JDBC later.<\/p>\n<p>For the GridDB Python client to be installed, we need <code>python3.10<\/code>, which is why we chose to extend <code>apache\/airflow:latest-python3.10<\/code>. With the GridDB Python Client installed, we can access GridDB via <a href=\"http:\/\/www.toshiba-sol.co.jp\/en\/pro\/griddb\/docs-en\/v4_3\/GridDB_TQL_Reference.html\">TQL<\/a> and work through using GridDB directly through a Python script &#8212; this ties into one of the major benefits of using GridDB over PostgreSQL.<\/p>\n<h4>Dockerfile for Extending the GridDB Image<\/h4>\n<p>The next file I&#8217;d like to showcase here is <code>Dockerfile.griddb<\/code>. Here are the contents:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">from griddb\/griddb\n\nUSER root\n\n# Install GridDB c_client\nWORKDIR \/\nRUN wget --no-check-certificate https:\/\/github.com\/griddb\/c_client\/releases\/download\/v5.0.0\/griddb-c-client_5.0.0_amd64.deb\nRUN dpkg -i griddb-c-client_5.0.0_amd64.deb\n\nRUN wget --no-check-certificate https:\/\/github.com\/griddb\/cli\/releases\/download\/v5.0.0\/griddb-ce-cli_5.0.0_amd64.deb\nRUN dpkg -i griddb-ce-cli_5.0.0_amd64.deb\n\nUSER gsadm<\/code><\/pre>\n<\/div>\n<p>We are extending the base GridDB image solely to add in the GridDB C-Client as a prereq for other GridDB libraries; we are also adding in the GridDB CLI tool so that we may query our DB easily if needed.<\/p>\n<h4>Airflow&#8217;s Docker-Compose File<\/h4>\n<p>The most important part of the installation process is using the <code>docker-compose.yml<\/code> file provided by the Airflow team; this file contains all of the different services\/containers which are needed to run the Airflow tool. The nice thing about using <code>docker-compose<\/code> is that all of the services contained within the file are automatically placed within a shared network space. Not only that, but we can bring up and bring down all services with one singular command &#8212; including those images which we extended ourselves. Handy!<\/p>\n<p>I will not be showcasing the entirety of the contents here as the file is really large, but here is a snippet:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">---\nversion: '3'\nx-airflow-common:\n  &airflow-common\n  image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:latest}\n  environment:\n    &airflow-common-env\n    AIRFLOW__CORE__EXECUTOR: CeleryExecutor\n    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2:\/\/airflow:airflow@postgres\/airflow\n    # For backward compatibility, with Airflow &lt;2.3\n    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2:\/\/airflow:airflow@postgres\/airflow\n    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql:\/\/airflow:airflow@postgres\/airflow\n    AIRFLOW__CELERY__BROKER_URL: redis:\/\/:@redis:6379\/0\n    AIRFLOW__CORE__FERNET_KEY: ''\n    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'\n    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'\n    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'\n    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}\n  volumes:\n    - .\/dags:\/opt\/airflow\/dags\n    - .\/logs:\/opt\/airflow\/logs\n    - .\/plugins:\/opt\/airflow\/plugins\n  user: \"${AIRFLOW_UID:-50000}:0\"\n  depends_on:\n    &airflow-common-depends-on\n    redis:\n      condition: service_healthy\n    postgres:\n      condition: service_healthy\n\n  griddb-server:\n    build:\n      context: .\n      dockerfile: Dockerfile.griddb\n    expose:\n      - \"10001\"\n      - \"10010\"\n      - \"10020\"\n      - \"10040\"\n      - \"20001\"\n      - \"41999\"\n    environment:\n      NOTIFICATION_MEMBER: 1\n      GRIDDB_CLUSTER_NAME: myCluster\n\n\n  postgres:\n    image: postgres:13\n    environment:\n      POSTGRES_USER: airflow\n      POSTGRES_PASSWORD: airflow\n      POSTGRES_DB: airflow\n    volumes:\n      - postgres-db-volume:\/var\/lib\/postgresql\/data\n      - .\/dags:\/var\/lib\/postgresql\/dags\n    ports: \n      - 5432:5432\n    healthcheck:\n      test: [\"CMD\", \"pg_isready\", \"-U\", \"airflow\"]\n      interval: 5s\n      retries: 5\n    restart: always\n\n      airflow-webserver:\n    &lt;&lt;: *airflow-common\n    command: webserver\n    ports:\n      - 8080:8080\n    healthcheck:\n      test: [\"CMD\", \"curl\", \"--fail\", \"http:\/\/localhost:8080\/health\"]\n      interval: 10s\n      timeout: 10s\n      retries: 5\n    restart: always\n    depends_on:\n      &lt;&lt;: *airflow-common-depends-on\n      airflow-init:\n        condition: service_completed_successfully<\/code><\/pre>\n<\/div>\n<p>As can be seen here, this file contains instructions for running various containers, including the two databases being showcased here. You will also notice that the GridDB service is being build directly from the local <code>Dockerfile.griddb<\/code> file. Astute observers may also notice that Airflow is not getting the same treatment despite it also having its own <code>Dockerfile<\/code> in this repository.<\/p>\n<p>The reason for that is that the original Airflow compose orchestration file uses the <code>x-airflow-common<\/code> environment variables to keep things consistent with this file among the many different services. To play nice with this sort of docker-compose file building, we must build our image locally so that the compose file can use that pre-built image for the purposes of this project.<\/p>\n<h3>Running the Docker Containers<\/h3>\n<p>As stated above, the very first thing we must do is build our <code>Dockerfile.airflow<\/code> image, and once it&#8217;s done, we can finally use the <code>docker-compose<\/code> command to run everything at once.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker build -f Dockerfile.airflow . --tag extending_airflow:latest<\/code><\/pre>\n<\/div>\n<p>Once this image is built, you should be able to see the <code>extending_airflow:latest<\/code> image in your local environment (<code>$ docker images<\/code>).<\/p>\n<p>And now that this is ready, we can go ahead and run all of our services.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker-compose up -d<\/code><\/pre>\n<\/div>\n<pre><code>[+] Running 8\/8\n\u283f Container griddb-airflow      Running                                                                  0.0s\n\u283f Container airflow-redis-1              Healthy                                                                 18.6s\n\u283f Container postgres-airflow           Healthy                                                                 18.6s\n\u283f Container airflow-airflow-init-1       Exited                                                                  37.2s\n\u283f Container airflow-airflow-triggerer-1  Started                                                                 37.9s\n\u283f Container airflow-airflow-scheduler-1  Started                                                                 37.9s\n\u283f Container airflow-airflow-worker-1     Started                                                                 37.9s\n\u283f Container airflow-airflow-webserver-1  Started                                                                 37.5s\n<\/code><\/pre>\n<p>This will go through and grab all relevant images from dockerhub or build the images locally (<code>Dockerfile.griddb<\/code>) and then once it&#8217;s ready, all containers will be running directly on your machine.<\/p>\n<p>This can be verified by running the <code>process status<\/code> command:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker ps<\/code><\/pre>\n<\/div>\n<pre><code>CONTAINER ID   IMAGE                                      COMMAND                  CREATED              STATUS                          PORTS                                                                                                                                                                                                                                                                                                                NAMES\nad1511b22415   extending_airflow:latest                   \"\/usr\/bin\/dumb-init \u2026\"   About a minute ago   Up About a minute (healthy)     0.0.0.0:8080-&gt;8080\/tcp, :::8080-&gt;8080\/tcp                                                                                                                                                                                                                                                                            3_airflow_migration-airflow-webserver-1\n68bdef86bf4f   extending_airflow:latest                   \"\/usr\/bin\/dumb-init \u2026\"   About a minute ago   Up About a minute (healthy)     8080\/tcp                                                                                                                                                                                                                                                                                                             3_airflow_migration-airflow-scheduler-1\nb020473ce932   extending_airflow:latest                   \"\/usr\/bin\/dumb-init \u2026\"   About a minute ago   Up About a minute (healthy)     8080\/tcp                                                                                                                                                                                                                                                                                                             3_airflow_migration-airflow-triggerer-1\n69c99ea8ce87   extending_airflow:latest                   \"\/usr\/bin\/dumb-init \u2026\"   About a minute ago   Up About a minute (healthy)     8080\/tcp                                                                                                                                                                                                                                                                                                             3_airflow_migration-airflow-worker-1\n9c778c1f1d72   3_airflow_migration_griddb-server          \"\/bin\/bash \/start-gr\u2026\"   About a minute ago   Up About a minute               10001\/tcp, 10010\/tcp, 10020\/tcp, 10040\/tcp, 20001\/tcp, 41999\/tcp                                                                                                                                                                                                                                                     griddb-airflow\n921cc78b8c1b   redis:latest                               \"docker-entrypoint.s\u2026\"   About a minute ago   Up About a minute (healthy)     6379\/tcp                                                                                                                                                                                                                                                                                                             3_airflow_migration-redis-1\n0ba780dd6b99   postgres:13                                \"docker-entrypoint.s\u2026\"   About a minute ago   Up About a minute (healthy)     0.0.0.0:5432-&gt;5432\/tcp, :::5432-&gt;5432\/tcp                                                                                                                                                                                                                                                                            postgres-airflow\n<\/code><\/pre>\n<h2>Using Apache Airflow<\/h2>\n<p>Now that we have our tool up and running, the next thing we will want to do is of course interface with it. One of the containers\/services within our docker-compose file hosts a webserver with a nice UI used for managing your workflows.<\/p>\n<p>So, in a browser, head over to http:\/\/localhost:8080\/ and enter in your credentials (username and password are both <code>airflow<\/code>). From here, you will a big list of premade DAGs, these are the workflows which we manage to orchestrate our data flows. In our case, we want to make two different DAGs, one for the one-time migration of all of our Postgresql data, and then another for a continuous migration.<\/p>\n<p>But before we do that, let&#8217;s first make sure our newly created PostgreSQL database has the data we wish to showcase being migrated over.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-1.png\"><img fetchpriority=\"high\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-1.png\" alt=\"\" width=\"1396\" height=\"646\" class=\"aligncenter size-full wp-image-29061\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-1.png 1396w, \/wp-content\/uploads\/2023\/01\/airflow-1-300x139.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-1-1024x474.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-1-768x355.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-1-600x278.png 600w\" sizes=\"(max-width: 1396px) 100vw, 1396px\" \/><\/a><\/p>\n<h3>Using the Airflow UI to Make Connections to our Databases<\/h3>\n<p>Before we head into writing our Python code (DAGs), let&#8217;s first make sure our Airflow scheduler\/worker\/webserver can interact with our GridDB server and PostgreSQL databases. As explained before, because all of these services share a docker-compose file, they will automatically all share the same network space.<\/p>\n<p>First let&#8217;s connect to PostgreSQL.<\/p>\n<h4>Connect to PostgreSQL<\/h4>\n<p>So first let&#8217;s connect to our PostgreSQL database:<\/p>\n<p>From the browser, navigate over to Admin &#8211;> Connections.<\/p>\n<p>Now we can make an explicit connection to our targeted database. For PostgreSQL, select it from the connection type dropdown and then enter in your credentials<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">Host: postgres\nSchema: airflow\nLogin: airflow\nPassword: airflow\nPort: 5432<\/code><\/pre>\n<\/div>\n<p>Once you hit test, it should show up as successful. And as a note, the host is the name of your service from within the docker-compose file (the service name is its hostname which equates to its IP address in a shared network).<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/02\/airflow-2.png\"><img decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/02\/airflow-2.png\" alt=\"\" width=\"1079\" height=\"875\" class=\"aligncenter size-full wp-image-29383\" srcset=\"\/wp-content\/uploads\/2023\/02\/airflow-2.png 1079w, \/wp-content\/uploads\/2023\/02\/airflow-2-300x243.png 300w, \/wp-content\/uploads\/2023\/02\/airflow-2-1024x830.png 1024w, \/wp-content\/uploads\/2023\/02\/airflow-2-768x623.png 768w, \/wp-content\/uploads\/2023\/02\/airflow-2-600x487.png 600w\" sizes=\"(max-width: 1079px) 100vw, 1079px\" \/><\/a><\/p>\n<h4>Connect to GridDB<\/h4>\n<p>To connect to GridDB you can connect via JDBC in the same vein as the way mentioned above.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">Connection Type: JDBC Connection\nConnection URL: jdbc:gs:\/\/griddb-server:20001\/myCluster\/public\nLogin: admin\nPassword: admin\nDriver Path: \/usr\/share\/java\/gridstore-jdbc-5.1.0.jar\nDriver Class: com.toshiba.mwcloud.gs.sql.Driver<\/code><\/pre>\n<\/div>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-3.png\"><img decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-3.png\" alt=\"\" width=\"1873\" height=\"1260\" class=\"aligncenter size-full wp-image-29063\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-3.png 1873w, \/wp-content\/uploads\/2023\/01\/airflow-3-300x202.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-3-1024x689.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-3-768x517.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-3-1536x1033.png 1536w, \/wp-content\/uploads\/2023\/01\/airflow-3-600x404.png 600w\" sizes=\"(max-width: 1873px) 100vw, 1873px\" \/><\/a><\/p>\n<p>But you can also connect via the Python client:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">import griddb_python as griddb\n\nfactory = griddb.StoreFactory.get_instance()\nDB_HOST = \"griddb-server:10001\" #griddb-server is the hostname of the service\nDB_CLUSTER = \"myCluster\"\nDB_USER = \"admin\"\nDB_PASS = \"admin\"\n\ngridstore = factory.get_store(\n    notification_member=DB_HOST, cluster_name=DB_CLUSTER, username=DB_USER, password=DB_PASS\n)<\/code><\/pre>\n<\/div>\n<p>The above Python code is only inserted inside the Python DAGs in which you intend to use GridDB, it is not made explicit in the Connections tab of your UI.<\/p>\n<h3>Ingesting Data into PostgreSQL Container Database<\/h3>\n<p>For demo purposes, we will ingest the relevant data that we need into our database by copying over the CSV to our PostgreSQL container and then using the <code>COPY<\/code> command.<\/p>\n<p>First up we will be copying over a <code>.csv<\/code> file over from our local machine to the PostgreSQL container. This can be accomplished by using the image name (airflow-postgres-1, named by our <code>docker-compose<\/code> file) and the <code>docker cp<\/code> command. As for the file we are copying over, you can find it within <code>dags\/data\/device.csv<\/code>.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker cp dags\/data\/device.csv postgres-airflow:\/tmp\/<\/code><\/pre>\n<\/div>\n<p>The <code>docker cp<\/code> command is executed similarly to the <code>scp<\/code> or <code>cp<\/code> command in normal CLI operations.<\/p>\n<p>Once you copy over the csv file, please ssh into your PostgreSQL container:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker exec -it airflow-postgres-1 bash<\/code><\/pre>\n<\/div>\n<p>Once in there, drop into the <code>psql shell<\/code> as the user airflow.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\"># psql -U airflow\npsql (13.9 (Debian 13.9-1.pgdg110+1))\nType \"help\" for help.\n\nairflow=#<\/code><\/pre>\n<\/div>\n<p>From here, it&#8217;s trivial to ingest the CSV data into our DB. First we create our table and then tell our database to copy the CSV rows into that table.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">airflow=# CREATE TABLE if not exists device ( ts timestamp, device varchar(30), co float8, humidity float8, light bool, lpg float8, motion bool, smoke float8, temp float8 );<\/code><\/pre>\n<\/div>\n<pre><code>CREATE TABLE\n<\/code><\/pre>\n<p>And then <code>COPY<\/code> everything<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">airflow=# copy device(ts, device, co, humidity, light, lpg, motion, smoke, temp) from '\/tmp\/device.csv' DELIMITER ',' CSV HEADER;<\/code><\/pre>\n<\/div>\n<h3>Migrating from PostgreSQL to GridDB<\/h3>\n<p>And now we will create our first DAG to accomplish our task. Because Airflow is built on Python, our DAGs are simply Python code with some simple dressing. Before we get into the file snippet, I will point out that since we are using the GridDB Python client, we can easily interface with our GridDB database from directly within our Airflow DAG. So when you see the code involved, a lot of it will not be too different from a normal python application.<\/p>\n<h4>Airflow-Specific Python Code<\/h4>\n<p>Anyway, here is is a snippet of our GridDB Migration DAG. This first portion will be kept to only showcasing the Airflow-specific portion of the code. First, we will import the Airflow libraries necessary to finish our task.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">from airflow import DAG\nfrom airflow.providers.postgres.operators.postgres import PostgresOperator\nfrom airflow.hooks.postgres_hook import PostgresHook\n\nfrom airflow.operators.python import PythonOperator\n\ndefault_args = {\n    'owner': 'israel',\n    'retries': 5,\n    'retry_delay': timedelta(minutes=5)\n}\n\nwith DAG(\n    dag_id='dag_migrating_postgres_to_griddb_v04',\n    default_args=default_args,\n    start_date=datetime(2021, 12, 19),\n    schedule_interval='@once'\n) as dag:\n\n    task1 = PythonOperator(\n        task_id='migrate_from_postgres_to_griddb',\n        python_callable=migrate_from_postgres_to_griddb\n    )\n\n    task1<\/code><\/pre>\n<\/div>\n<p>Once that&#8217;s out of the way, we will put in config options for our DAG, namely its ID (name) and our chosen interval for this task to be run. One thing I will point out about this specific DAG: the scheduled interval is simply <code>@once<\/code>, which means once we run this DAG once, will cease to schedule itself to run again &#8212; the continuous migration will be handled by a separate DAG.<\/p>\n<p>Moving on, at the very bottom of the file we select a sequence of what tasks we would like done and in what order. For this workflow, we are simply calling one thing, task1, which simply calls a python function <code>migrate_from_postgres_to_griddb<\/code>.<\/p>\n<p>The actual migration works how you may expect: it will query PostgreSQL, take all relevant data, transform it for our needs, and then <code>put<\/code> into our GridDB server.<\/p>\n<h4>Connecting and Querying PostgreSQL within DAG<\/h4>\n<p>Here is that Python function which handles the actual migration:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">def migrate_from_postgres_to_griddb(**context):\n    \"\"\"\n    Queries Postgres and places all data into GridDB\n    \"\"\"\n\n    gridstore = factory.get_store(\n        notification_member=DB_HOST, cluster_name=DB_CLUSTER, username=DB_USER, password=DB_PASS\n    )\n\n    postgres = PostgresHook(postgres_conn_id=\"postgres\")\n    conn = postgres.get_conn()\n    cursor = conn.cursor()\n    cursor.execute(\"SELECT * FROM device;\")<\/code><\/pre>\n<\/div>\n<p>Similar to our previous blog, we are not simply taking the PostgreSQL table as is and placing it within our GridDB databse. Instead what we are doing is following a more IoT-centric schema, one in which each sensor in a dataset has its own container (table). But before we get into that, we must first make the necessary connections to each database.<\/p>\n<p>To connect to PostgreSQL, we can pull directly from the connection we established earlier in this article. To do so within our DAG, we can import the PostgresHook python library that we introduced into the container with our <code>Dockerfile.airflow<\/code> file. And then we simply point to our Postgres&#8217;s connection ID: <code>postgres = PostgresHook(postgres_conn_id=\"postgres\")<\/code>.<\/p>\n<p>Once connected we can use the cursor to conduct our queries; in this case we simply want to grab all data from our <code>device<\/code> table.<\/p>\n<h4>Connecting To GridDB<\/h4>\n<p>Similarly, we can connect to our GridDB server by either utilizing the JdbcHook similar to the steps above for PostgreSQL, or we can use the GridDB Python Client. Using the Python client allows for use similar to any other Python application and is a bit easier to use, so we will be utilizing this method. As for the connection details, because these services share a <code>docker-compose<\/code> environment, we can simply use the hostname.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">factory = griddb.StoreFactory.get_instance()\nDB_HOST = \"griddb-server:10001\"\nDB_CLUSTER = \"myCluster\"\nDB_USER = \"admin\"\nDB_PASS = \"admin\"\n\n  gridstore = factory.get_store(\n        notification_member=DB_HOST, cluster_name=DB_CLUSTER, username=DB_USER, password=DB_PASS\n  )<\/code><\/pre>\n<\/div>\n<p>The rest of the details are the defaults used for GridDB in <code>FIXED_LIST<\/code> mode. And indeed, if you are working on Python code with GridDB outside of the context of Docker containers, you would be able to connect the same way, you&#8217;d just need to replace the <code>DB_HOST<\/code> with the proper ip address (port stays the same).<\/p>\n<p>Once connected, we can try transforming our rows from PostgreSQL and placing those rows into our GridDB.<\/p>\n<h4>Creating Containers with GridDB<\/h4>\n<p>As explained above, we will need to take all of our rows from PostgreSQL and split them out into three different containers on the GridDB side. Each unique device name from our original dataset should be the key in the new Time Series Container for our data. So let&#8217;s make those three containers first &#8212; we can accomplish this within our DAG using Python code. We can also set the schema upon creation time as well:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">def create_container(gridstore, device_name):\n    gridstore.drop_container(device_name)\n    conInfo = griddb.ContainerInfo(name=device_name,\n                column_info_list=[[\"ts\", griddb.Type.TIMESTAMP],\n                                    [\"co\", griddb.Type.DOUBLE],\n                                    [\"humidity\", griddb.Type.DOUBLE],\n                                    [\"light\", griddb.Type.BOOL],\n                                    [\"lpg\", griddb.Type.DOUBLE],\n                                    [\"motion\", griddb.Type.BOOL],\n                                    [\"smoke\", griddb.Type.DOUBLE],\n                                    [\"temperature\", griddb.Type.DOUBLE]],\n                type=griddb.ContainerType.TIME_SERIES)\n    # Create the container\n    try:\n        gridstore.put_container(conInfo)\n        print(conInfo.name, \"container successfully created\")\n    except griddb.GSException as e:\n        for i in range(e.get_error_stack_size()):\n            print(\"[\", i, \"]\")\n            print(e.get_error_code(i))\n            print(e.get_location(i))\n            print(e.get_message(i))<\/code><\/pre>\n<\/div>\n<p>Here we are going to create a container with the schema as seen in <code>column_info_list<\/code>. We split this out into its own function because it will need to be run three separate times for each unique sensor.<\/p>\n<p>So eventually we can run something simple like this:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">  container_name_list = [\"device1\", \"device2\", \"device3\"]\n\n  for container_name in container_name_list:\n      create_container(gridstore, container_name)<\/code><\/pre>\n<\/div>\n<h4>Transforming Data from PostgreSQL for GridDB Ingestion<\/h4>\n<p>Now finally let&#8217;s place our original data into a <a href=\"https:\/\/griddb.net\/en\/blog\/using-pandas-dataframes-with-griddb\/\">dataframe<\/a> so that its easier to manipulate and split out. From there we simply put each new object containing our rows into GridDB:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">    cursor.execute(\"SELECT * FROM device;\")\n    import pandas as pd\n    rows = pd.DataFrame(cursor.fetchall())\n\n    dfs = dict(tuple(rows.groupby([1])))\n\n    device1 = dfs['b8:27:eb:bf:9d:51']\n    device1 = device1.drop([1], axis=1)\n    device1 = device1.values.tolist()\n\n    device2 = dfs['00:0f:00:70:91:0a']\n    device2 = device2.drop([1], axis=1)\n    device2 = device2.values.tolist()\n\n    device3 = dfs['1c:bf:ce:15:ec:4d']\n    device3 = device3.drop([1], axis=1)\n    device3 = device3.values.tolist()<\/code><\/pre>\n<\/div>\n<p>Each deviceX object will contain all rows which can be placed directly into GridDB with <code>multi_put<\/code>:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">    try: \n        d1_cont = gridstore.get_container(\"device1\")\n        d1_cont.multi_put(device1)\n\n        d2_cont = gridstore.get_container(\"device2\")\n        d2_cont.multi_put(device1)\n\n        d3_cont = gridstore.get_container(\"device3\")\n        d3_cont.multi_put(device1)\n\n    except griddb.GSException as e:\n        for i in range(e.get_error_stack_size()):\n            print(\"[\", i, \"]\")\n            print(e.get_error_code(i))\n            print(e.get_location(i))\n            print(e.get_message(i))<\/code><\/pre>\n<\/div>\n<p>And from there our GridDB database should have three new containers populated with all relevant sensor information from our Kaggle dataset.<\/p>\n<h3>Continuous Migration<\/h3>\n<p>Now that our databases are up to parity, let&#8217;s move on to the next step. Let&#8217;s say your sensor data keeps emitting data to PostgreSQL that you would like to send over to GridDB. With a regularly scheduled DAG, we can make sure all new rows are pushed to GridDB.<\/p>\n<p>The idea will be that we query each sensor&#8217;s container and find the latest (<code>MAX<\/code>) timestamp of data which is then cross referenced with PostgreSQL. If the data is newer or &#8220;bigger&#8221; on PostgreSQL, we take all offending rows and push them into the proper container.<\/p>\n<h4>Querying for MAX Timestamp on GridDB<\/h4>\n<p>After making our connection to GridDB, we need to formulate our query. In this case, because TQL operates on a per container basis, there is no need to specify the container name within the query itself. This means we only need to formulate one string and it can be reused for all containers:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">    try:\n        d1_cont = gridstore.get_container(\"device1\")\n        d2_cont = gridstore.get_container(\"device2\")\n        d3_cont = gridstore.get_container(\"device3\")\n\n        sql = \"SELECT MAX(ts)\"\n\n        d1_query = d1_cont.query(sql)\n        d2_query = d2_cont.query(sql)\n        d3_query = d3_cont.query(sql)<\/code><\/pre>\n<\/div>\n<p>Here we are using the TQL aggregation query of MAX which will find for us the absolute latest timestamp (ts) in our container.<\/p>\n<h4>Querying PostgreSQL Using GridDB Timestamp Values<\/h4>\n<p>Once that value is saved, we simply use it to query PostgreSQL:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">        d1_rs = d1_query.fetch()\n        d2_rs = d2_query.fetch()\n        d3_rs = d3_query.fetch()\n\n        d1_row = d1_rs.next().get(griddb.Type.TIMESTAMP)\n        d1_latest_time = d1_row.replace(microsecond=999999) # adding in max microseconds as GridDB does not save these values\n\n        d2_row = d2_rs.next().get(griddb.Type.TIMESTAMP)\n        d2_latest_time = d2_row.replace(microsecond=999999)\n\n        d3_row = d3_rs.next().get(griddb.Type.TIMESTAMP)\n        d3_latest_time = d3_row.replace(microsecond=999999)\n      \n      \n        d1_sql = \"SELECT DISTINCT ON (ts) * FROM device WHERE ts > '\" + str(d1_latest_time)+ \"' AND device = 'b8:27:eb:bf:9d:51' ORDER BY ts DESC;\"\n        d2_sql = \"SELECT DISTINCT ON (ts) * FROM device WHERE ts > '\" + str(d2_latest_time)+ \"' AND device = '00:0f:00:70:91:0a' ORDER BY ts DESC;\"\n        d3_sql = \"SELECT DISTINCT ON (ts) * FROM device WHERE ts > '\" + str(d3_latest_time)+ \"' AND device = '1c:bf:ce:15:ec:4d' ORDER BY ts DESC;\"\n        \n        cursor.execute(d1_sql)\n        d1_result = cursor.fetchall()\n\n        cursor.execute(d2_sql)\n        d2_result = cursor.fetchall()\n\n        cursor.execute(d3_sql)\n        d3_result = cursor.fetchall()<\/code><\/pre>\n<\/div>\n<p>We now have three different objects which contain the rows later than the GridDB counterpart. So next is checking if the list is empty, if not, we place the rows into GridDB:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">    if not d1_result:\n        print(\"Device1 contains 0 new rows to add\")\n    else:\n        print(d1_latest_time)\n        print(d1_sql)\n        print(\"Device1 contains \" + str(len(d1_result)) + \" new rows to add\")\n        for row in d1_result:\n            print(\"putting row to device1 in GridDB\")\n            row = list(row)\n            del row[1] #get rid of device name\n            print(row)\n            d1_cont.put(row)\n\n    if not d2_result:\n        print(\"Device2 contains 0 new rows to add\")\n    else:\n        print(d2_latest_time)\n        print(d2_sql)\n        print(\"Device2 contains \" + str(len(d2_result)) + \" new rows to add\")\n        for row in d2_result:\n            print(\"putting row to device2 in GridDB\")\n            row = list(row)\n            del row[1] #get rid of device name\n            print(row)\n            d2_cont.put(list(row))\n\n    if not d3_result:\n        print(\"Device3 contains 0 new rows to add\")\n    else:\n        print(\"Device3 contains \" + str(len(d3_result)) + \" new rows to add\")\n        print(d3_latest_time)\n        print(d3_sql)\n        for row in d3_result:\n            print(\"putting row to device3 in GridDB\")\n            row = list(row)\n            del row[1]\n            print(row)\n            d3_cont.put(list(row))<\/code><\/pre>\n<\/div>\n<p>And now that our DAGs are ready, we can head over to the UI and turn these things on and see if they will run and do what we expect.<\/p>\n<h3>Activating DAGs and Checking the Results<\/h3>\n<p>Let&#8217;s head back over to the UI and activate these DAGs. Our DAG which simply does an initial bulk migration is scheduled to run only once. Let&#8217;s turn that one on and make sure it runs. Use the search bar <code>search DAGs<\/code> and search GridDB.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-4.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-4.png\" alt=\"\" width=\"1662\" height=\"440\" class=\"aligncenter size-full wp-image-29055\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-4.png 1662w, \/wp-content\/uploads\/2023\/01\/airflow-4-300x79.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-4-1024x271.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-4-768x203.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-4-1536x407.png 1536w, \/wp-content\/uploads\/2023\/01\/airflow-4-600x159.png 600w\" sizes=\"(max-width: 1662px) 100vw, 1662px\" \/><\/a><\/p>\n<p>First, let&#8217;s turn on the DAG with the ID of <code>griddb_postgres_migration_initial<\/code>. Once you click on it, there is a small switch next to its name where you can toggle it on\/off.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-5.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-5.png\" alt=\"\" width=\"945\" height=\"278\" class=\"aligncenter size-full wp-image-29056\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-5.png 945w, \/wp-content\/uploads\/2023\/01\/airflow-5-300x88.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-5-768x226.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-5-600x177.png 600w\" sizes=\"(max-width: 945px) 100vw, 945px\" \/><\/a><\/p>\n<p>Once it is turned on, because we set the start date into the past and the scheduled interval as just once, it will run immediately and then cease firing. To check its logs, click on the graph button and then find your task inside a DAG map. Since we only have one task, it&#8217;s simply one small square sitting in space.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-7.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-7.png\" alt=\"\" width=\"1303\" height=\"412\" class=\"aligncenter size-full wp-image-29058\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-7.png 1303w, \/wp-content\/uploads\/2023\/01\/airflow-7-300x95.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-7-1024x324.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-7-768x243.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-7-600x190.png 600w\" sizes=\"(max-width: 1303px) 100vw, 1303px\" \/><\/a><\/p>\n<p>From there you can click the log button at the top. And a small tip if the results of your DAG are not exactly what you expect: instead of finagling with the scheduling or waiting for another run, you can click the clear button which will auto-force the DAG to run again.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-8.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-8.png\" alt=\"\" width=\"707\" height=\"652\" class=\"aligncenter size-full wp-image-29059\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-8.png 707w, \/wp-content\/uploads\/2023\/01\/airflow-8-300x277.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-8-600x553.png 600w\" sizes=\"(max-width: 707px) 100vw, 707px\" \/><\/a><\/p>\n<p>And now you can look at the logs to see if your task completed successfully.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-6.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-6.png\" alt=\"\" width=\"1861\" height=\"813\" class=\"aligncenter size-full wp-image-29057\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-6.png 1861w, \/wp-content\/uploads\/2023\/01\/airflow-6-300x131.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-6-1024x447.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-6-768x336.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-6-1536x671.png 1536w, \/wp-content\/uploads\/2023\/01\/airflow-6-600x262.png 600w\" sizes=\"(max-width: 1861px) 100vw, 1861px\" \/><\/a><\/p>\n<h4>Querying GridDB<\/h4>\n<p>Luckily for us, it appears as though three containers were successfully made. To double check we can write some python code in a DAG to make sure we can query data from those containers or we can SSH into the GridDB container and drop into the CLI shell and query that way.<\/p>\n<p>You can drop into the shell like so:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker exec -it griddb-airflow gs_sh<\/code><\/pre>\n<\/div>\n<pre><code>The connection attempt was successful(NoSQL).\nThe connection attempt was successful(NewSQL).\ngs&gt;\n<\/code><\/pre>\n<p>And from there you can check your container information<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">gs[public]> showcontainer device2\nDatabase    : public\nName        : device2\nType        : TIME_SERIES\nPartition ID: 38\nDataAffinity: -\n\nCompression Method : NO\nCompression Window : -\nRow Expiration Time: -\nRow Expiration Division Count: -\n\nColumns:\nNo  Name                  Type            CSTR  RowKey   Compression\n------------------------------------------------------------------------------\n 0  ts                    TIMESTAMP       NN    [RowKey]\n 1  co                    DOUBLE\n 2  humidity              DOUBLE\n 3  light                 BOOL\n 4  lpg                   DOUBLE\n 5  motion                BOOL\n 6  smoke                 DOUBLE\n 7  temperature           DOUBLE<\/code><\/pre>\n<\/div>\n<p>And run a query:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">gs[public]> select * from device2;\n111,817 results. (13 ms)\ngs[public]> get 3\nts,co,humidity,light,lpg,motion,smoke,temperature\n2020-07-12T00:01:34.735Z,0.00284008860710157,76.0,false,0.005114383400977071,false,0.013274836704851536,19.700000762939453\n2020-07-12T00:01:46.869Z,0.002938115626660429,76.0,false,0.005241481841731117,false,0.013627521132019194,19.700000762939453\n2020-07-12T00:02:02.785Z,0.0029050147565559607,75.80000305175781,false,0.0051986974792943095,false,0.013508733329556249,19.700000762939453\nThe 3 results had been acquired.<\/code><\/pre>\n<\/div>\n<h4>Continuous Migration<\/h4>\n<p>Next we will be starting our next DAG which we set on a scheduled interval of every hour:<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-python\">with DAG(\n    dag_id='griddb_postgres_migration_continuous',\n    default_args=default_args,\n    start_date=datetime(2022, 12, 19),\n    schedule_interval='0 * * * *'<\/code><\/pre>\n<\/div>\n<p>The styling of the schedule_interval is in the style <code>cron<\/code>, but if you are not too familiar, there are also <a href=\"https:\/\/airflow.apache.org\/docs\/apache-airflow\/1.10.1\/scheduler.html#dag-runs\">shortcuts available<\/a>. We could, for example, instead choose to type in &#8216;@hourly&#8217; and that would be just as valid.<\/p>\n<p>In any case, let&#8217;s add some rows to our PostgreSQL database manually and see the continuous migration in action. For now, let&#8217;s leave this off and put some rows in<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">$ docker exec -it postgres-airflow bash\n\/# psql -U airflow\npsql (13.9 (Debian 13.9-1.pgdg110+1))\nType \"help\" for help.<\/code><\/pre>\n<\/div>\n<p>And now let&#8217;s add some rows.<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">postgres=# INSERT INTO device VALUES (now(), 'b8:27:eb:bf:9d:51', 0.003551, 50.0, false, 0.00754352, false, 0.0232432, 21.6);<\/code><\/pre>\n<\/div>\n<pre><code>INSERT 0 1\n<\/code><\/pre>\n<p>You can do this however many times you&#8217;d like as a test. You can even try inserting with different sensor names too,to make an attempt at inserting into different containers.<\/p>\n<p>Once we&#8217;ve done that, let&#8217;s turn on our DAG and check the logs.<\/p>\n<p><a href=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-9.png\"><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/griddb.net\/wp-content\/uploads\/2023\/01\/airflow-9.png\" alt=\"\" width=\"2046\" height=\"702\" class=\"aligncenter size-full wp-image-29060\" srcset=\"\/wp-content\/uploads\/2023\/01\/airflow-9.png 2046w, \/wp-content\/uploads\/2023\/01\/airflow-9-300x103.png 300w, \/wp-content\/uploads\/2023\/01\/airflow-9-1024x351.png 1024w, \/wp-content\/uploads\/2023\/01\/airflow-9-768x264.png 768w, \/wp-content\/uploads\/2023\/01\/airflow-9-1536x527.png 1536w, \/wp-content\/uploads\/2023\/01\/airflow-9-600x206.png 600w\" sizes=\"(max-width: 2046px) 100vw, 2046px\" \/><\/a><\/p>\n<p>In my shell I added five new rows total amongst the different containers. You can see the results here:<\/p>\n<pre><code>[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - SELECT DISTINCT ON (ts) * FROM device WHERE ts &gt; '2023-01-13 00:04:42.999999' AND device = 'b8:27:eb:bf:9d:51' ORDER BY ts DESC;\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - Device1 contains 4 new rows to add\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - putting row to device1 in GridDB\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - [datetime.datetime(2023, 1, 13, 0, 13, 17, 145397), 0.003551, 50.0, False, 0.00754352, False, 0.0232432, 21.6]\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - putting row to device1 in GridDB\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - [datetime.datetime(2023, 1, 13, 0, 13, 16, 116032), 0.003551, 50.0, False, 0.00754352, False, 0.0232432, 21.6]\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - putting row to device1 in GridDB\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - [datetime.datetime(2023, 1, 13, 0, 13, 15, 321145), 0.003551, 50.0, False, 0.00754352, False, 0.0232432, 21.6]\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - putting row to device1 in GridDB\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - [datetime.datetime(2023, 1, 13, 0, 13, 13, 344482), 0.003551, 50.0, False, 0.00754352, False, 0.0232432, 21.6]\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - 2023-01-12 23:55:08.999999\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - SELECT DISTINCT ON (ts) * FROM device WHERE ts &gt; '2023-01-12 23:55:08.999999' AND device = '00:0f:00:70:91:0a' ORDER BY ts DESC;\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - Device2 contains 1 new rows to add\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - putting row to device2 in GridDB\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - [datetime.datetime(2023, 1, 13, 0, 13, 26, 633364), 0.003551, 50.0, False, 0.00754352, False, 0.0232432, 21.6]\n[2023-01-12, 16:14:21 PST] {logging_mixin.py:137} INFO - Device3 contains 0 new rows to add\n[2023-01-12, 16:14:21 PST] {python.py:177} INFO - Done. Returned value was: None\n[2023-01-12, 16:14:21 PST] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=griddb_postgres_migration_continuous, task_id=griddb_postgres_migration_continuous, execution_date=20230112T230000, start_date=20230113T001421, end_date=20230113T001421\n<\/code><\/pre>\n<p>And then of course, if you want to be diligent, you can pop back on over to the GridDB shell and make sure your rows are being inserted<\/p>\n<div class=\"clipboard\">\n<pre><code class=\"language-sh\">gs[public]> tql device1 select MAX(ts);\n1 results. (15 ms)\ngs[public]> get\nResult\n2023-01-13T00:13:17.145Z\nThe 1 results had been acquired.<\/code><\/pre>\n<\/div>\n<p>It matches!<\/p>\n<h2>Conclusion<\/h2>\n<p>And with that, we have successfully set up Apache Airflow via docker and have allowed for our data to be continuously migrating over from PostgreSQL to GridDB.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB import\/export tools, walking through exactly how to use the tool, along with considering WHY a user may want to shift from using PostgreSQL over to GridDB. You can read about that [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":29108,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[121],"tags":[],"class_list":["post-46742","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-blog"],"acf":[],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.1.1 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT<\/title>\n<meta name=\"description\" content=\"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT\" \/>\n<meta property=\"og:description\" content=\"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB\" \/>\n<meta property=\"og:url\" content=\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\" \/>\n<meta property=\"og:site_name\" content=\"GridDB: Open Source Time Series Database for IoT\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/griddbcommunity\/\" \/>\n<meta property=\"article:published_time\" content=\"2023-02-02T08:00:00+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2025-11-13T20:56:25+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.griddb.net\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png\" \/>\n\t<meta property=\"og:image:width\" content=\"1160\" \/>\n\t<meta property=\"og:image:height\" content=\"653\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Israel\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@GridDBCommunity\" \/>\n<meta name=\"twitter:site\" content=\"@GridDBCommunity\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Israel\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"25 minutes\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\"},\"author\":{\"name\":\"Israel\",\"@id\":\"https:\/\/griddb.net\/en\/#\/schema\/person\/c8a430e7156a9e10af73b1fbb46c2740\"},\"headline\":\"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow\",\"datePublished\":\"2023-02-02T08:00:00+00:00\",\"dateModified\":\"2025-11-13T20:56:25+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\"},\"wordCount\":2981,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\/\/griddb.net\/en\/#organization\"},\"image\":{\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage\"},\"thumbnailUrl\":\"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png\",\"articleSection\":[\"Blog\"],\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\",\"url\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\",\"name\":\"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT\",\"isPartOf\":{\"@id\":\"https:\/\/griddb.net\/en\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage\"},\"image\":{\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage\"},\"thumbnailUrl\":\"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png\",\"datePublished\":\"2023-02-02T08:00:00+00:00\",\"dateModified\":\"2025-11-13T20:56:25+00:00\",\"description\":\"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB\",\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage\",\"url\":\"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png\",\"contentUrl\":\"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png\",\"width\":1160,\"height\":653},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/griddb.net\/en\/#website\",\"url\":\"https:\/\/griddb.net\/en\/\",\"name\":\"GridDB: Open Source Time Series Database for IoT\",\"description\":\"GridDB is an open source time-series database with the performance of NoSQL and convenience of SQL\",\"publisher\":{\"@id\":\"https:\/\/griddb.net\/en\/#organization\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/griddb.net\/en\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"en-US\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/griddb.net\/en\/#organization\",\"name\":\"Fixstars\",\"url\":\"https:\/\/griddb.net\/en\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/griddb.net\/en\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/griddb.net\/wp-content\/uploads\/2019\/04\/fixstars_logo_web_tagline.png\",\"contentUrl\":\"https:\/\/griddb.net\/wp-content\/uploads\/2019\/04\/fixstars_logo_web_tagline.png\",\"width\":200,\"height\":83,\"caption\":\"Fixstars\"},\"image\":{\"@id\":\"https:\/\/griddb.net\/en\/#\/schema\/logo\/image\/\"},\"sameAs\":[\"https:\/\/www.facebook.com\/griddbcommunity\/\",\"https:\/\/x.com\/GridDBCommunity\",\"https:\/\/www.linkedin.com\/company\/griddb-by-toshiba\"]},{\"@type\":\"Person\",\"@id\":\"https:\/\/griddb.net\/en\/#\/schema\/person\/c8a430e7156a9e10af73b1fbb46c2740\",\"name\":\"Israel\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/griddb.net\/en\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/4df8cfc155402a2928d11f80b0220037b8bd26c4f1b19c4598d826e0306e6307?s=96&d=mm&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/4df8cfc155402a2928d11f80b0220037b8bd26c4f1b19c4598d826e0306e6307?s=96&d=mm&r=g\",\"caption\":\"Israel\"},\"url\":\"https:\/\/www.griddb.net\/en\/author\/israel\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT","description":"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/","og_locale":"en_US","og_type":"article","og_title":"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT","og_description":"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB","og_url":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/","og_site_name":"GridDB: Open Source Time Series Database for IoT","article_publisher":"https:\/\/www.facebook.com\/griddbcommunity\/","article_published_time":"2023-02-02T08:00:00+00:00","article_modified_time":"2025-11-13T20:56:25+00:00","og_image":[{"width":1160,"height":653,"url":"https:\/\/www.griddb.net\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png","type":"image\/png"}],"author":"Israel","twitter_card":"summary_large_image","twitter_creator":"@GridDBCommunity","twitter_site":"@GridDBCommunity","twitter_misc":{"Written by":"Israel","Est. reading time":"25 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#article","isPartOf":{"@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/"},"author":{"name":"Israel","@id":"https:\/\/griddb.net\/en\/#\/schema\/person\/c8a430e7156a9e10af73b1fbb46c2740"},"headline":"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow","datePublished":"2023-02-02T08:00:00+00:00","dateModified":"2025-11-13T20:56:25+00:00","mainEntityOfPage":{"@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/"},"wordCount":2981,"commentCount":0,"publisher":{"@id":"https:\/\/griddb.net\/en\/#organization"},"image":{"@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage"},"thumbnailUrl":"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png","articleSection":["Blog"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/","url":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/","name":"Scheduling Data Migration from PostgreSQL to GridDB using Apache Airflow | GridDB: Open Source Time Series Database for IoT","isPartOf":{"@id":"https:\/\/griddb.net\/en\/#website"},"primaryImageOfPage":{"@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage"},"image":{"@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage"},"thumbnailUrl":"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png","datePublished":"2023-02-02T08:00:00+00:00","dateModified":"2025-11-13T20:56:25+00:00","description":"In a previous blog, we discussed migrating a sample IoT dataset from PostgreSQL to GridDB. To accomplish our feat, we used the official GridDB","inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/griddb.net\/en\/blog\/scheduling-data-migration-from-postgresql-to-griddb-using-apache-airflow\/#primaryimage","url":"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png","contentUrl":"\/wp-content\/uploads\/2023\/01\/airflowandgriddb.png","width":1160,"height":653},{"@type":"WebSite","@id":"https:\/\/griddb.net\/en\/#website","url":"https:\/\/griddb.net\/en\/","name":"GridDB: Open Source Time Series Database for IoT","description":"GridDB is an open source time-series database with the performance of NoSQL and convenience of SQL","publisher":{"@id":"https:\/\/griddb.net\/en\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/griddb.net\/en\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/griddb.net\/en\/#organization","name":"Fixstars","url":"https:\/\/griddb.net\/en\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/griddb.net\/en\/#\/schema\/logo\/image\/","url":"https:\/\/griddb.net\/wp-content\/uploads\/2019\/04\/fixstars_logo_web_tagline.png","contentUrl":"https:\/\/griddb.net\/wp-content\/uploads\/2019\/04\/fixstars_logo_web_tagline.png","width":200,"height":83,"caption":"Fixstars"},"image":{"@id":"https:\/\/griddb.net\/en\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/griddbcommunity\/","https:\/\/x.com\/GridDBCommunity","https:\/\/www.linkedin.com\/company\/griddb-by-toshiba"]},{"@type":"Person","@id":"https:\/\/griddb.net\/en\/#\/schema\/person\/c8a430e7156a9e10af73b1fbb46c2740","name":"Israel","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/griddb.net\/en\/#\/schema\/person\/image\/","url":"https:\/\/secure.gravatar.com\/avatar\/4df8cfc155402a2928d11f80b0220037b8bd26c4f1b19c4598d826e0306e6307?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4df8cfc155402a2928d11f80b0220037b8bd26c4f1b19c4598d826e0306e6307?s=96&d=mm&r=g","caption":"Israel"},"url":"https:\/\/www.griddb.net\/en\/author\/israel\/"}]}},"_links":{"self":[{"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/posts\/46742","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/comments?post=46742"}],"version-history":[{"count":1,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/posts\/46742\/revisions"}],"predecessor-version":[{"id":51411,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/posts\/46742\/revisions\/51411"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/media\/29108"}],"wp:attachment":[{"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/media?parent=46742"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/categories?post=46742"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.griddb.net\/en\/wp-json\/wp\/v2\/tags?post=46742"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}