enrichMobile Pub/Sub sample¶
A minimal Python sample showing how to publish enrichMobile job requests to
Modigie's Pub/Sub interface and consume the job responses through a pull subscription (long-polling).
It consists of two scripts and a CSV file. Download all files by clicking on each link:
publisher.py-
reads a CSV of input rows and publishes one request per row
subscriber.py-
pulls responses from the response subscription and writes completed ones to disk
- `sample_input.csv
-
the CSV file with sample inputs that you can use safely with a synthetic repository
Requirements¶
- Python 3.9+
google-cloud-pubsub- A GCP identity (user account or service account) with permission to publish to the request topic and pull from the response subscription
Install the dependency:
Authenticating to GCP¶
The scripts use Application Default Credentials (ADC). If you've never set ADC on this machine, both scripts will fail with an authentication error on the first call.
Option 1 — user account (easiest for trying out the sample):
The first command opens a browser for you to sign in.
The second tells GCP which project to bill the API calls against — without it you'll see a UserWarning on every run (harmless, but noisy).
Important
Before executing the second command, replace the PROJECT_ID with the configuration value you received from Modigie.
- Replace
PROJECT_IDwith the exclusive tenant project ID of your Modigie Org Account. Example:modigie-c-q8irqgvocgg8lt4dwa3e.
Option 2 — service account (recommended for unattended use):
Download the service account key JSON from GCP, then point ADC at it:
- Replace with the actual path to your service account key JSON file.
The service account must have these IAM roles on the project:
roles/pubsub.publisheron the request topicroles/pubsub.subscriberon the response subscription
By default, this would be the service account Application Account that Modigie shared with you and the necessary IAM roles are already granted to it.
Configuration¶
Both scripts have project, topic, and subscription IDs hardcoded as constants at the top of the file.
Important
You're running against a different environment (your own project, a different account, etc.), therefore please replace them with the configuration values you received from Modigie.
-
PROJECT_IDandTOPIC_IDin publisher.py# TODO: Replace with GCP project that owns the Pub/Sub topic. PROJECT_ID = "modigie-c-q8irqgvocgg8lt4dwa3e" # (1)! # TODO: Replace with Pub/Sub topic that receives enrichMobile JobV2 request messages. TOPIC_ID = "inpubsub-job-request-enrichmobile-2678a46bd60d84c" # (2)!- Replace
PROJECT_IDwith the exclusive tenant project ID of your Modigie Org Account. - Replace
TOPIC_IDwith your job request topic for the job type enrichMobile. The part after/topics/.
- Replace
-
PROJECT_IDandSUBSCRIPTION_IDin subscriber.py# GCP project that owns the Pub/Sub subscription. PROJECT_ID = "modigie-c-q8irqgvocgg8lt4dwa3e" # (1)! # Pub/Sub subscription that receives enrichMobile lifecycle/job response events. SUBSCRIPTION_ID = "inpubsub-job-response-enrichmobile-pull-all-externalendpoint-2678a46bd60d84c" # (2)!- Replace
PROJECT_IDwith the exclusive tenant project ID of your Modigie Org Account. - Replace
SUBSCRIPTION_IDwith your job response pull subscription for the job type enrichMobile. The part after/subscriptions.
- Replace
Important
If your repository allows multiple job types, make sure that you pick the correct topic and subscription. Topics and subscriptions are job-type specific.
Publisher¶
Reads a CSV and publishes one job request per row.
CSV input format¶
The publisher expects a CSV file with a header row.
Required columns:
requestIdfirstNamelastName- At least one of
emailorlinkedinUrl(both may be provided on the same row)
Optional columns:
jobTitlecompanyexpireAfter
Expected header row:
Example CSV¶
The included sample_input.csv exercises every code path the subscriber
handles — successful completion across the three input shapes (email-only,
URL-only, both), a job-level rejection, and a request-level discard:
requestId,firstName,lastName,jobTitle,company,email,linkedinUrl,expireAfter
req_b01_email_only,John,Doe,Software Engineer,Acme Inc.,jdoe@acme.com,,P2D
req_b02_url_only,Kelly,Sample,Engineer,Acme Inc.,,https://www.linkedin.com/in/kelly-location,P2D
req_b03_email_and_url,Jane,Smith,Product Manager,Globex,jsmith@globex.com,https://www.linkedin.com/in/kelly-country-location,P2D
req_b04_missing_company,Kelly,Sample,,,kelly@example.com,https://www.linkedin.com/in/kelly-email-country-location,
req_b05_bad_email,Bob,Test,Engineer,Acme Inc.,not-a-valid-email,,P2D
req_b01_email_only,Different,Person,,,other@example.com,,
What each row demonstrates:
| Row | Demonstrates |
|---|---|
req_b01_email_only (1st) |
Successful completion with email-only input |
req_b02_url_only |
Successful completion with LinkedIn URL only |
req_b03_email_and_url |
Successful completion with both inputs |
req_b04_missing_company |
Job-level rejection (missing employment.company.title) |
req_b05_bad_email |
Job-level rejection (no usable identifier) |
req_b01_email_only (2nd) |
Request-level discard (DuplicateJobIdError) |
Notes¶
requestIdmust be unique per row. Used in job request as Pub/Sub attributemodigieJobRequestId. Reusing arequestIdwith a different payload triggers aDuplicateJobIdErrordiscard; reusing one with an identical payload causes the service to replay the previous final response rather than create a new job.expireAftermust be an ISO-8601 duration, for exampleP1D(1 day) orPT4H(4 hours). Used in job request as Pub/Sub attributemodigieJobExpireAfter. Also see Duration | ISO 8601 | Wikipedia.- The script does not fully validate business rules before publishing.
- Validation is mainly handled by the downstream service.
Subscriber¶
Pulls responses from the response subscription and processes them based on what the service sent. The service emits two kinds of messages:
- Request discards: signaled by the
modigieJobRequestDiscardReasonTypePub/Sub attribute. The request was malformed (missing/invalid/duplicatemodigieJobRequestId, or invalid JSON body) and no job was created. There will be no further messages for the requestId. In a production app, mark the requestId as terminal in your own tracking so you don't wait for responses that will never come. - Job lifecycle messages:
created,validated,enqueued,dispatching,processing,completed,rejected,failed, etc. Multiple messages may be emitted per job as it progresses. Onlycompletedis persisted to disk; the rest are logged.
Each completed response is written to {outputDir}/{requestId}.json, so you
can join input CSV rows to output files by requestId.
Runs until Ctrl-C.
Running end-to-end¶
Use two terminals. Start the subscriber first — Pub/Sub will hold messages on the subscription either way, but starting the subscriber first lets you watch responses arrive in real time instead of wondering whether anything is happening.
Terminal 1 — subscriber:
Terminal 2 — publisher:
You'll see lifecycle messages flow through the subscriber:
[created] req_b01_email_only
[validated] req_b01_email_only
[enqueued] req_b01_email_only
[dispatching] req_b01_email_only
[processing] req_b01_email_only
[completed] req_b01_email_only -> /path/to/responses/req_b01_email_only.json
Rejected jobs (the request became a job, then validation failed) include the reason:
[rejected] req_b04_missing_company - The job payload must contain `employment.company.title` of type `string`
Discarded requests (the request never became a job — typically a duplicate or
malformed modigieJobRequestId) include the discard reason type:
When you receive a discard, your app should mark the requestId as terminal in its own tracking no further messages will arrive for it.