Insert into postgres not working

Dear OpenFN community,
I have two issues with the postgresql adapter in a self hosted OpenFn application in a dockerized environment.

  1. The idea is to send a POST request with some .json formatted data and store whatever was in the input .json as an entry in a postgres sql table called “payload” using this command:
// Get started by adding operations from the API reference

fn( state => {
  insert('payload', {data: state.data }, {writeSql: true, setNull: "'NaN'", logValues: true });
    return state;
});

I get a notification that the job succeeded, no error in the console of the docker but the “payload” table is still empty. However the table “dataclips” shows the data as expected which should come directly from the OpenFN application.

  1. Further it does not seem to matter if I provide a API token or not in the request using postman and executes the job in both matters.

Log on the docker indicates it ran through

cmd:
openfn execute \
  -a @openfn/language-postgresql@latest=/app/priv/openfn/lib/node_modules/@openfn/language-postgresql-4.1.7 \
  -s /tmp/state-1694596536-1-1mwmiwk.json \
  --no-strict-output \
  -l info \
  -o /tmp/output-1694596536-1-v66c91.json \
  /tmp/expression-1694596536-1-1f7rpcu.js


[debug]
f5070 : [CLI] ℹ Versions:
f5070 :          ▸ node.js                        18.17.1
f5070 :          ▸ cli                            0.0.35
f5070 :          ▸ runtime                        0.0.21
f5070 :          ▸ compiler                       0.0.29
f5070 :          ▸ @openfn/language-postgresql    4.1.7
f5070 : [CLI] ✔ Loaded state from /tmp/state-1694596536-1-1mwmiwk.json
f5070 : [CLI] ℹ Added import statement for @openfn/language-postgresql
f5070 : [CLI] ℹ Added export * statement for @openfn/language-postgresql
f5070 : [CLI] ✔ Compiled job from /tmp/expression-1694596536-1-1f7rpcu.js
f5070 : [R/T] ℹ Resolved adaptor @openfn/language-postgresql to version 4.1.7
f5070 : [R/T] ✔ Operation 1 complete in 0ms
f5070 : [CLI] ✔ Writing output to /tmp/output-1694596536-1-v66c91.json
f5070 : [CLI] ✔ Done in 632ms! ✨
f5070 :
[debug] QUERY OK db=0.4ms idle=970.8ms
begin []
[debug] QUERY OK db=0.6ms
1 Like

hey @piische-tph , looks like that insert() is wrapped inside an fn block.

fn blocks are useful if you want to write some custom javascript and do stuff to your state object in ways that isn’t handled by regular operations. but here, it looks like you’re only trying to insert a record, so use that insert function without wrapping it up like that.

Your job should look like this:

insert(
  'payload',
  state.data,
  { writeSql: true, setNull: "'NaN'", logValues: true }
);

Or if you wanted to do some really fancy footwork (though this is definitely an anti-pattern!) you could break out into one of those custom fn blocks (think of this as an escape hatch) and then manually call the insert operation with state like this:

fn(state => {
  return insert('payload', {data: state.data }, {writeSql: true, setNull: "'NaN'", logValues: true })(state);
});

See how I return the operation when called with (state)? I go from insert(x) to return insert(x)(state);

But that would be bad :slight_smile: . You’d almost be replicating the OpenFn internals again in your job code. The “contract” that we make is to take each operation in a job expression and call it with state… so a normal job might look like this:

get(...);
get(...);
post(...);
fn(...);
insert(...);

Each of those 5 operations will return a function that can be called with state. You can run compile to see how that list of operations is turned into actual Javascript.

In essence, we take this like of operations and call each one with state, then pass the output of the first to the second, etc., etc. It’s a big reducer pipeline. You give us insert() and we call insert(...)(state) and then pass the result (as “state”) to the next item in the list of operations.

So… tl;dr: drop the fn(...) unless you’re specifically messing around with how you want OpenFn to execute your operations.

2 Likes

Thank you so much @taylordowns2000,

That solved my main issue, I used the insert function without the fn() as you mentioned it and now it works and stores the content in the provided table of the postgres database, thanks for the quick help!

As you suggested I kept it simple :smiley: But thanks for the insights in how I could achieve more complex scenarios.

Do you also have an idea on how I can set the webhook to be only accessible for a user by providing an API Token? Because now it seems that everyone with the correct webhook link can post to the endpoint or am I wrong?

1 Like

Webhook security is a feature currently only available in the OpenFn platform V1 (see docs), but it’s coming up on our roadmap for Lightning V2. You can track the issue here and we can follow up once delivered in the coming weeks!

2 Likes

Thanks @aleksa-krolls,

okay then I will subscribe / track the issue and implement it once it is available.

Hello,

In my case, I need to retrieve approximately 630 submissions from KoboToolbox and insert them into a PostgreSQL database.

Based on the documentation, I see that I can use either upsertMany() or upsert() for this operation.

Could you provide guidance on how to implement these two methods? Below is a sample of the submissions I am fetching from KoboToolbox.

NOTE: The the Job to fetch submissions from kobotoolbox returns a 630 submissions at ago

Kindly help @taylordowns2000, @aleksa-krolls and @piische-tph @joe


{
"data": [
  {
      "Email_Address": "xxx@gmail.com",
      "First_Name": "Vi",
      "Full_Address": "full Name",
      "Last_Name": "Rath",
      "Phone_Number": "0779887711",
      "Rating_Testing/from_1___5": "option_4",
      "Send_you_Video": "9917211914725.mp4",
      "Send_your_Audio": "771721191211697.m4a",
      "Send_your_Image": "0091721191097122.jpg",
      "Send_your_Location": "11.5668611 104.8216572 4.0 35.181",
      "__version__": "viG4SDZehe2qopft9ZMTzn",
      "_attachments": [
        {
          "download_large_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/1/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/1/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/1/?format=json",
          "download_url": "https://kf.k8s-gke.gov/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/1/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/ee0f3a60-e425-42a8-9882-f21e55b9b8b3/1721191097122.jpg",
          "id": 1,
          "instance": 4,
          "mimetype": "image/jpeg",
          "question_xpath": "Send_your_Image",
          "xform": 1
        },
        {
          "download_large_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/2/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/2/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/2/?format=json",
          "download_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/2/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/ee0f3a60-e425-42a8-9882-f21e55b9b8b3/1721191114725.mp4",
          "id": 2,
          "instance": 4,
          "mimetype": "video/mp4",
          "question_xpath": "Send_you_Video",
          "xform": 1
        },
        {
          "download_large_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/3/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/3/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/3/?format=json",
          "download_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/4/attachments/3/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/ee0f3a60-e425-42a8-9882-f21e55b9b8b3/1721191211697.m4a",
          "id": 3,
          "instance": 4,
          "mimetype": "audio/mpeg",
          "question_xpath": "Send_your_Audio",
          "xform": 1
        }
      ],
      "_geolocation": [
        11.5668611,
        104.8216572
      ],
      "_id": 4,
      "_notes": [],
      "_status": "submitted_via_web",
      "_submission_time": "2024-07-17T04:40:26",
      "_submitted_by": "super_admin",
      "_supplementalDetails": {},
      "_tags": [],
      "_uuid": "ee0f3a60-e425-42a8-9882-f21e55b9b8b3",
      "_validation_status": {},
      "_xform_id_string": "atKJtMNAno2yfe56hHj8w9",
      "end": "2024-07-17T11:40:19.875+07:00",
      "formhub/uuid": "e0b8fe1e2e4a4eff96f80b9c72862995",
      "meta/instanceID": "uuid:kkktrsaee0f3a60-e425-42a8-9882-f21e55b9b8b3",
      "start": "2024-07-17T11:37:28.852+07:00"
    },
    {
      "Email_Address": "zzz@gmail.com",
      "First_Name": "Rath",
      "Full_Address": "Svayrieng",
      "Last_Name": "Vi",
      "Phone_Number": "0888501747",
      "Rating_Testing/from_1___5": "option_2",
      "Send_you_Video": "1721191282422.mp4",
      "Send_your_Audio": "1721191310068.m4a",
      "Send_your_Image": "1721191273629.jpg",
      "Send_your_Location": "11.5668796 104.8216724 4.0 15.176",
      "__version__": "viG4SDZehe2qopft9ZMTzn",
      "_attachments": [
        {
          "download_large_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/4/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/4/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/4/?format=json",
          "download_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/4/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/c67dac1e-9e1e-4f73-a962-8f841152d8ab/1721191282422.mp4",
          "id": 4,
          "instance": 5,
          "mimetype": "video/mp4",
          "question_xpath": "Send_you_Video",
          "xform": 1
        },
        {
          "download_large_url": "https://kf.k8s-gke.gov/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/5/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/5/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/5/?format=json",
          "download_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/5/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/c67dac1e-9e1e-4f73-a962-8f841152d8ab/1721191273629.jpg",
          "id": 5,
          "instance": 5,
          "mimetype": "image/jpeg",
          "question_xpath": "Send_your_Image",
          "xform": 1
        },
        {
          "download_large_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/6/?format=json",
          "download_medium_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/6/?format=json",
          "download_small_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/6/?format=json",
          "download_url": "https://kf.k8s-gke.gov/api/v2/assets/atKJtMNAno2yfe56hHj8w9/data/5/attachments/6/?format=json",
          "filename": "super_admin/attachments/e0b8fe1e2e4a4eff96f80b9c72862995/c67dac1e-9e1e-4f73-a962-8f841152d8ab/1721191310068.m4a",
          "id": 6,
          "instance": 5,
          "mimetype": "audio/mpeg",
          "question_xpath": "Send_your_Audio",
          "xform": 1
        }
      ],
      "_geolocation": [
        11.5668796,
        104.8216724
      ],
      "_id": 5,
      "_notes": [],
      "_status": "submitted_via_web",
      "_submission_time": "2024-07-17T04:42:07",
      "_submitted_by": "super_admin",
      "_supplementalDetails": {},
      "_tags": [],
      "_uuid": "c67dac1e-9e1e-4f73-a962-8f841152d8ab",
      "_validation_status": {},
      "_xform_id_string": "atKJtMNAno2yfe56hHj8w9",
      "end": "2024-07-17T11:41:54.117+07:00",
      "formhub/uuid": "e0b8fe1e2e4a4eff96f80b9c72862995",
      "meta/instanceID": "uuid:c67dac1e-9e1e-4f73-a962-8f841152d8ab",
      "start": "2024-07-17T11:40:31.668+07:00"
    }
  ]
}

Hiya @stephencoduor, I have write down sample code to help you understand how you can map your form submission data and use upsertMany to add your submission data to a postgresql DB.

So let’s say in your workflow
Step 1: Fetch Form Submission adaptor: kobotoolbox
Using getSubmission will put the result you shared in state.data

getSubmission(formId, {query: {_submission_time: { $gte: "205-04-02"}}}) => state.data = [{}]

Step 2: Map Submission Data adaptor: common
In this step you will map and transform your form submissions to you DB table columns.

// Map the submission data to your DB schema (table coulmns)
// Make sure all required fields are present in the submission data
fn((state) => {
  state.submissionMapping = state.data.map((submission) => {
    // key: is a table column name
    // value: is the value of the submission data
    return {
      id: submission._id,
      email: submission.Email_Address,
      formId: submission.formId,
      submission_time: submission._submission_time,
      // other table columns
      // e.g. status: submission.status,
      // e.g. created_at: submission._created_at,
      // e.g. updated_at: submission._updated_at,
    };
  });
  return state;
});

Step 3: Upsert Submission adaptor:postgresql
Use upsertMany to add your mapped data into the DB

upsertMany(
  "users", // the DB table
  "email", // a DB column with a unique constraint. If this key is not present, the record will be created, if it exists, it will be updated
  $.submissionMapping,
  { logValues: true } // optional: to see the SQL query
);

Look at the comments in the sample code i shared, It will help you understand the code more clearly.

Thanks @mtuchi , Let me give it a try

@mtuchi I have done mapping for most of the data points but wondering where I can put the mapping for the attachments since its going to be a child table.

Should I create another function that handles mapping for the attachments?
If so how will I pass the attachment for each submission in the upsertMany

Hiya @stephencoduor in that can you will have to insert the parent data first then insert the children data.

For example:

upsertMany('parents', 'id', $.parentsMap);

fn(state => {
  // You need parent_id for your children
  // You might need to write sql query to query the parent_id for each child
  state.childrensMap = state.parents
    .map(({ id, childrens }) => {
      const placeholder = [];
      childrens.forEach(child => {
        placeholder.push({
          parent_id: id,
          //Map other chidlren columns
          name: child.name,
        });
      });
      return placeholder;
    })
    .flat();
  return state;
});

upsertMany('children', 'id', $.childrensMap);

Something to keep in mind, you can always write an sql query for your operations.If you comfortable writing sql queries for updating both parent and children, you can use the sql(qs) function to execute that query

Thanks @mtuchi for the explanation

1 Like

@mtuchi I Intend to use different approach

For each submission, I intend to perform the following steps:

  1. Upsert the main (parent) record into the cashfish table.
  2. Upsert multiple related records into the capfish_attachments table, which serves as the child table.

This approach differs from the initial suggestion, where the plan was to:

  1. Bulk upsert all submissions into the parent table (cashfish) using upsertMany.
  2. Then, bulk upsert all attachments into the child table (capfish_attachments), assuming all parent records had already been created.

In the revised approach, each submission is processed individually — the parent record is upserted first, followed immediately by its associated attachments.

I think this will ensures that each set of attachments is explicitly tied to its corresponding submission in a more controlled and sequential manner.

What are your thoughts on this approach?

My apology I am still new to openfn, Still learning

each('$.submissionMapping[*]', state => {
  const submission = state.data;

  return upsert('capfish', 'answer_id', {
    answer_id: submission._id,
    email_Address: submission.Email_Address,
    //Map other columns
     meta_instanceID: submission.meta_instanceID,
    start_time: submission.start_time,

  }).then(state => {
    const attachments = (submission._attachments || []).map(a => ({
      answer_id: submission._id,
      attachment_id: a.id,
      large_url: a.download_large_url,
      medium_url: a.download_medium_url,
      url: a.download_url,
      file_name: a.filename,
     //Map other chidlren columns
    }));

    return upsertMany('capfish_attachments', 'attachment_id', attachments)(state);
  });
});

Hiya @stephencoduor

Based on the sample code you shared, you can improve the code to avoid some anti-patterns. Something that’s very important to remember is, operations run at top level. So when see the yourself doing something like this upsert(...args)(state), this is a red flag and you should consider refactoring your approach.

Here is what i would suggest. Since capfish_attachement does not depend on the newly created id from captfish table. You can still use upsertMany for both operations. Here is how it will look like :backhand_index_pointing_down:

fn((state) => {
  state.capfishMap = [];
  state.capfishAttachments = [];

  state.data.forEach((submission) => {
    state.capfishMap.push({
      answer_id: submission._id,
      email_Address: submission.Email_Address,
      //Map other columns
      meta_instanceID: submission.meta_instanceID,
      start_time: submission.start_time,
    });

    state.capfishAttachments.push(
      ...submission._attachments.map((attachment) => ({
        answer_id: submission._id,
        attachment_id: attachment.id,
        large_url: attachment.download_large_url,
        medium_url: attachment.download_medium_url,
        url: attachment.download_url,
        file_name: attachment.filename,
      }))
    );
  });

  return state;
});

upsertMany("capfish", "answer_id", $.capfishMap);
upsertMany("capfish_attachments", "attachment_id", $.capfishAttachments);
1 Like

Wow!! Thanks @mtuchi