"""LEGAL DISCLAIMER:

This script is provided "as is" as a code sample for demonstration purposes only.
Modigie, Inc. makes no representations or warranties of any kind, express or implied,
about the completeness, accuracy, reliability, suitability, or availability with respect
to this script for any purpose. Any reliance you place on such information is therefore
strictly at your own risk.

In no event will we be liable for any loss or damage including without limitation,
indirect or consequential loss or damage, or any loss or damage whatsoever arising
from loss of data or profits arising out of, or in connection with, the use of this script.

This script is not intended for production use.

===============================================================================

Subscribe to enrichMobile job responses and write completed ones to disk.

Pulls messages from the enrichMobile response subscription. The service emits
two kinds of messages:

  - Request discards: signaled by the modigieJobRequestDiscardReasonType Pub/Sub
    attribute. The request was malformed (missing/invalid/duplicate
    modigieJobRequestId, or invalid JSON body) and no job resource was created.
    There will be no further messages for this requestId.

  - Job lifecycle messages: created, validated, enqueued, dispatching,
    processing, completed, rejected, failed, etc. Multiple messages may be
    sent per request as the job progresses.

This subscriber:

  - Logs request discards with the discard reason type
  - Persists messages with body.status == "completed" to ./responses/{requestId}.json
  - Logs and discards every other status (with the reason, if present)
  - Logs and discards messages whose body isn't valid JSON

Each completed response is named after its modigieJobRequestId (echoed back in
the Pub/Sub attributes) so it correlates 1:1 with the input CSV row.

Runs until Ctrl-C.

Usage:
  python subscriber.py [output_dir]

  output_dir defaults to ./responses

"""

import json
import os
import sys
from concurrent.futures import TimeoutError as FuturesTimeoutError

from google.cloud import pubsub


# --8<-- [start:configuration]
# GCP project that owns the Pub/Sub subscription.
PROJECT_ID = "modigie-c-q8irqgvocgg8lt4dwa3e"  # (1)!

# Pub/Sub subscription that receives enrichMobile lifecycle/job response events.
SUBSCRIPTION_ID = "inpubsub-job-response-enrichmobile-pull-all-externalendpoint-2678a46bd60d84c"  # (2)!
# --8<-- [end:configuration]


def extract_reason(body: dict) -> str:
    """Pull a short human-readable reason out of a response body, if present.

    Non-completed responses include a nested 'reason' field. This helper extracts the
    most useful display text for logging.

    """
    reason = body.get("reason")
    if isinstance(reason, dict):
        return reason.get("body") or reason.get("message") or json.dumps(reason)
    if isinstance(reason, str):
        return reason
    return ""


def handle_response(outputDir: str, message) -> None:
    """Process one Pub/Sub response message.  Implementation must be idempotent! (Same message may be delivered more
    than once)

    Behavior:
      - discarded -> log reason why job was not created and skip processing
      - completed -> save JSON body to disk
      - any other status -> log and discard
      - invalid JSON / unexpected body type -> log and discard

    """
    # Correlation ID sent originally by the publisher and echoed back in response.
    requestId = message.attributes.get("modigieJobRequestId", "unknown")

    # Check first whether the request was discarded by the service before any job was created.
    # Discards are signaled by the modigieJobRequestDiscardReasonType Pub/Sub attribute and happen
    # when the request itself was malformed: missing/invalid/duplicate modigieJobRequestId, or invalid JSON body.
    # The message body is an Error object with details rather than a job resource.
    # NOTE: Discarded job requests should be marked in your system because no more job responses will arrive.
    discardReasonType = message.attributes.get("modigieJobRequestDiscardReasonType")
    if discardReasonType:
        rawBody = message.data.decode("utf-8", errors="replace")
        print(f"[discarded] {requestId} - no job was created due to {discardReasonType} - {rawBody[:500]}")
        return

    # Pub/Sub message data is bytes, so decode before JSON parsing.
    rawBody = message.data.decode("utf-8")

    try:
        # Parse Pub/Sub payload as JSON.
        body = json.loads(rawBody)
    except json.JSONDecodeError as e:
        print(f"[unparseable] {requestId} - {e}")
        return

    # Response payload is expected to be a JSON object.
    if not isinstance(body, dict):
        print(f"[unexpected] {requestId} - body is not an object: {type(body).__name__}")
        return

    # Lifecycle status determines whether to persist or discard the job response message.
    status = body.get("status")

    if status == "completed":
        # Ensure output directory exists before writing the response file.
        os.makedirs(outputDir, exist_ok=True)

        # Save one completed response per requestId for easy 1:1 tracking.
        path = os.path.join(outputDir, f"{requestId}.json")

        with open(path, "w", encoding="utf-8") as f:
            json.dump(body, f, ensure_ascii=False, indent=2)

        print(f"[completed] {requestId} -> {path}")
    else:
        # Log other lifecycle states for visibility but do not persist them.
        reason = extract_reason(body)
        suffix = f" - {reason}" if reason else ""
        print(f"[{status or 'noStatus'}] {requestId}{suffix}")


def listen_for_responses(outputDir: str) -> None:
    """Start a streaming subscriber and process messages until interrupted."""
    # Create Pub/Sub subscriber client and fully qualified subscription path.
    subscriber = pubsub.SubscriberClient()
    subscriptionPath = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

    def callback(message) -> None:
        """Pub/Sub callback executed for each incoming message."""
        try:
            handle_response(outputDir, message)

            # Ack after processing so the message is removed from the subscription.
            message.ack()
        except Exception as e:
            requestId = message.attributes.get("modigieJobRequestId", "unknown")
            rawBody = message.data.decode("utf-8", errors="replace")

            print(f"Error handling {requestId}: {e}")
            print(f"  Raw body: {rawBody[:500]}")

            # Ack to drop the message
            # redelivery won't help and would keep clogging the subscription.
            message.ack()

    # Start background streaming pull.
    streamingPullFuture = subscriber.subscribe(subscriptionPath, callback=callback)

    print(f"Listening for responses on {subscriptionPath}")
    print(f"Writing completed responses to {os.path.abspath(outputDir)}")
    print("Press Ctrl-C to stop.\n")

    # Keep the subscriber alive until the process is interrupted.
    with subscriber:
        try:
            streamingPullFuture.result()
        except (KeyboardInterrupt, FuturesTimeoutError):
            streamingPullFuture.cancel()
            streamingPullFuture.result()
            print("\nStopped.")


if __name__ == "__main__":
    # Optional first argument lets the user override the default output folder.
    outputDir = sys.argv[1] if len(sys.argv) > 1 else "responses"
    listen_for_responses(outputDir)
