Logo

Mocks for microservice environments

Getting Started

Tutorial Videos

Config Syntax

Request Matching

Response Templating

Async Actors (Kafka, AMQP etc.)

Performance & Chaos Testing

Remote Management

Changelog

Asynchronous Actors for Message Bus

Mockintosh offers “Mock Actor” approach for using with asynchronous message bus technologies, such as Apache Kafka, RabbitMQ, Apache ActiveMQ, Redis etc. or cloud services such as Google Cloud Pub/Sub, Amazon SQS etc. See supported backends. “Mock Actor” approach requires you to provide deployed message bus instance, and configure valid address for it inside configuration file.

Each Mock Actor is an entity that can be configured for producing or consuming messages or both. Management API/UI provides important synergies to work with Mock Actors.

To start creating Mock Actors in Mockintosh config, you need to configure a service, with type option set into kafka:

services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092  # broker string of pre-existing Kafka instance
    ssl: true
    actors: [ ]  # here we will configure the actors, see below

If you’re connecting to a Kafka cluster through SSL you need to set ssl field to true. By default SSL is disabled.

Note: The address field of asynchronous services supports templating. Such that the address can be fetched from an environment variable like: address: "{{env 'KAFKA' 'localhost:9092'}}"

Below are the configuration patterns for Mock Actors:

Scheduled Producer

Below is the configuration snippet for Mock Actor that will produce configured message each delay seconds, up to limit times. The delay option is key for this case, it distinguishes scheduled producer from “on-demand producer”.

services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: scheduled-producer-1  # just the name
        produce:
          queue: scheduled-queue1  # topic/queue name
          key: "message key, can be null"
          value: "message value"
          headers:
            constant: justvalue
            timestamp: '{{date.timestamp}}'  # regular Mockintosh templating can be used

        delay: 5  # seconds between producing
        limit: 100  # limit of how many messages to produce, optional

You can use most of Mockintosh templating equations, with exception of those dependant on request. Queue name and header names cannot be templated.

Note: Be careful with setting delay to low value with no limit option, this may run your message bus out of memory/disk space.

On-Demand Producer

On-demand producer is basically a scheduled producer with no delay option. Instead of producing messages automatically, this kind of actor waits for Management API call to trigger the message push.

management:
  port: 8000
services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: on-demand-1
        produce:
          create: true
          queue: on-demand1
          key: somekey or null
          value: "@value/from/file.json"  # it's possible to reference file

Now, to trigger producing the message on-demand, you need to issue an API call using actor’s name, like this:

curl -X POST http://localhost:8000/async/producers/on-demand-1

and the response of this request would be;

{
  "type": "kafka",
  "name": "on-demand-1",
  "index": 0,
  "queue": "on-demand1",
  "lastProduced": 1618695168.6416173
}

Note: create: true flag enables the topic creation if the given topic is not created yet.

It’s also possible to select the producer using its index number as an alternative to the actor’s name like:

curl -X POST http://localhost:8000/async/producers/0

Note: The limit option actually works for any kind of producer.

Triggering a Producer From an HTTP Endpoint

It’s possible trigger an asynchronous producer from an HTTP endpoint using the triggerAsyncProducer field. The value of this field can be either a possitive integer that indicates the global index of a producer or a string that selects the producer based on its name:

management:
  port: 8000
services:
  - name: Mock for Service1
    port: 8001
    endpoints:
    - path: "/endp1"
      response:
        body: "endp1"
        triggerAsyncProducer: 0
    - path: "/endp2"
      response:
        body: "endp2"
        triggerAsyncProducer: on-demand-1
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: on-demand-1
        produce:
          create: true
          queue: on-demand1
          key: somekey or null
          value: "@value/from/file.json"  # it's possible to reference file

The producer on-demand-1 that’s linked to the HTTP endpoints via the triggerAsyncProducer field is triggered whenever the an HTTP request is matched into the subject endpoint and the related response is returned.

Validating Consumer

The “validating consumer” actor is used when you need to check the fact of service publishing the message on the bus. For example, if your service accepts REST call and puts message on the bus, and you validating this behavior in an automated test. Again, for this validating you would need to have Management API enabled. Let’s see a snippet:

management:
  port: 8000
services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: validate-consume-1
        consume:
          queue: another-queue-name # topic/queue name to subscribe to, required
          group: "consumer-group" # optional consumer group
          key: matching keys  # expected key, optional
          value: "expected value"  # optional
          capture: 10  # limit len of messages to store for validation, optional, default: 1

To validate that message has appeared on the bus, you have to query Management API endpoint, like this:

curl http://localhost:8000/async/consumers/validate-consume-1

That would respond with a JSON containing the list of captured messages in the HAR format, that’s quite similar to the responses you can see in Traffic Logs. The traffic logging is specific to the selected consumer. The consumer will store last N messages, according to its capture setting.

To clear the captured message list, issue a DELETE call on the same URL:

curl -X DELETE http://localhost:8000/async/consumers/validate-consume-1

To narrow down the expected message, you can use regular matching equations in key, value or headers values:

management:
  port: 8000
services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: validate-consume-2
        consume:
          queue: another-queue-name
          key: "{{regEx 'prefix-(.*)'}}"
          value: "expected prefix-{{justName}}"  # see also "reactive producer" section
          headers:
            hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}" # see also "reactive producer" section

JSONSchema

The value field supports JSON Schema matching much like how it’s in HTTP. Except it’s the schema field used instead of or with the value field:

- consume:
    queue: validate-consume-3
    key: "{{regEx 'prefix-(.*)'}}"
    schema:
      type: object
      properties:
        somekey: { }
      required:
        - somekey
    headers:
      hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}"

If the schema field used together with the value then both are taken into account as the criteria for matching.

Referencing JSONSchema from an external file is also supported:

- consume:
    queue: validate-consume-3
    key: "{{regEx 'prefix-(.*)'}}"
    schema: "@path/to/schema.json"
    headers:
      hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}"

“Reactive” Producer

By mixing together actors of “validating consumer” and “on-demand producer” types, we can get the behavior when message is produced in “reaction” to another message consumed from the bus. You can also specify a delay between consumption and producing, to simulate some “processing time”.

services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: reactive-producer-1
        consume:
          queue: consume-from-topic-1
          key: "{{regEx 'prefix-(.*)'}}"
          value: "expected prefix-{{justName}}"  # see also "reactive producer" section
          headers:
            hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}" # see also "reactive producer" section
        delay: 5  # optional delay before producing
        produce:
          queue: produce-into-topic-2
          key: "can reference as {{consumed.key}} and {{consumed.value}}"
          value: "reference from consumed: {{justName}} {{myCapturedVar}}"
          headers:
            propagated-hdr: '{{consumed.headers.hdr-name}}'

Note: Validating the consumer and triggering the producing would work for “reactive producer”, too.

Extended Features

Asynchronous Index

For a configuration file like:

management:
  port: 8000
services:
  - name: Kafka Mock Actors
    type: kafka
    address: localhost:9092
    actors:
      - name: on-demand-1
        produce:
          queue: on-demand1
          key: somekey or null
          value: "@value/from/file.json"
      - name: validate-consume-1
        consume:
          queue: another-queue-name
          group: "consumer-group"
          key: matching keys
          value: "expected value"
          capture: 10

the /async management endpoint, returns an index that contains producers and consumers lists:

$ curl http://localhost:8000/async
{
  "producers": [
    {
      "type": "kafka",
      "name": "on-demand-1",
      "index": 0,
      "queue": "on-demand1",
      "producedMessages": 0,
      "lastProduced": null
    }
  ],
  "consumers": [
    {
      "type": "kafka",
      "name": "validate-consume-1",
      "index": 0,
      "queue": "another-queue-name",
      "captured": 0,
      "consumedMessages": 0,
      "lastConsumed": null
    }
  ]
}

captured field resembles the number of consumed messages that stored in the buffer while consumedMessages field resembles the number of all consumed messages throught the message stream.

Multiple Payloads

Similar to the multiple responses in HTTP, asynchronous producers support tagged payloads. Which means they can be a list of queue, key, value, headers combinations (JSON array) instead of being a single combination (JSON object):

produce:
  - queue: topicA
    key: keyA-1
    value: valueA-1
    headers:
      hdrA-1: valA-1
  - queue: topicA
    key: keyA-2
    value: valueA-2
    headers:
      hdrA-2: valA-2
  1. trigger: valueA-1 is produced as value.
  2. trigger: valueA-2 is produced as value.
  3. trigger: valueA-1 is produced as value.

By default a producer loops through its payloads indefinitely for each trigger. To disable this behavior, set multiPayloadsLooped to false similar to multiResponsesLooped in HTTP.

Note: Supplying different topics for multiple payloads throws a compile-time error.

Tagged Payloads

Similar to the tagged responses in HTTP, it’s possible to select certain payload or payloads using the tag field:

produce:
  - queue: topicA
    key: keyA-1
    value: valueA-1
    headers:
      hdrA-1: valA-1
  - queue: topicA
    tag: async-tagA-3
    key: keyA-3
    value: valueA-3
    headers:
      hdrA-3: valA-3
  - queue: topicA
    key: keyA-2
    value: valueA-2
    headers:
      hdrA-2: valA-2

Tags” is a generic feature so Setting Current Tag and Resetting Iterators are valid for asynchronous tags too.

Datasets

Similar to the datasets in HTTP, one can put a dataset field under actor to specify a list of key-value combinations to inject into response templating.

This field can be a string that starts with @ to indicate a path to an external JSON file like @subdir/dataset.json or an array:

dataset:
  - var1: val1
  - var1: val2
produce:
  - queue: topic1
    key: key1
    value: "dset: {{var1}}"
  1. trigger: dset: val1 is produced as value.
  2. trigger: dset: val2 is produced as value.
  3. trigger: dset: val1 is produced as value.

By default a producer loops through the given dataset indefinitely for each trigger. To disable this behavior, set datasetLooped to false similar to datasetLooped in HTTP.

Supported Backends

Mockintosh supports three different asynchronous backends; Apache Kafka, AMQP and Redis.

Apache Kafka

To be able to work with Apache Kafka, these two fields should be specified in a service:

type: kafka
address: localhost:9092

The kafka value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match to the Apache Kafka instance’s hostname/IP and port.

Advanced Message Queuing Protocol (AMQP)

AMQP as an OASIS standard is a widely accepted protocol accross the message broker software such as;

and cloud services such as;

To be able to work with AMQP, these two fields should be specified in a service:

type: amqp
address: localhost:5672

The amqp value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match to the AMQP target hostname/IP and port.

Note: rabbitmq and activemq as a value for the type field instead of amqp is also accepted.

Properties Object

It’s possible to supply an additional amqpProperties field to fill the keyword arguments in pika.spec.BasicProperties except the headers argument. headers argument is set by just the common headers field.

So for example, to mock the convertAndSend function of org.springframework.amqp.rabbit.core.RabbitTemplate class you would simply use these amqpProperties:

queue: queue1
value: '{"message":"OK"}'
amqpProperties:
  priority: 0
  delivery_mode: 2
  content_encoding: "UTF-8"
  content_type: "application/json"
headers:
  __TypeId__: "com.example.ClassName"

Redis

Redis as an in-memory database, can also be used as a message queue. But note that; key and headers fields are ignored for Redis type asynchronous services since Redis does not have such concepts. Message queue functionality through Redis is achived by PyRSMQ package.

To be able to work with Redis, these two fields should be specified in a service:

type: redis
address: localhost:6379

The redis value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match to the Redis instance’s hostname/IP and port.

Google Cloud Pub/Sub

Note: Reinstall Mockintosh with pip3 install mockintosh[cloud] to have the google-cloud-pubsub optional Python package dependency.

Google Cloud Pub/Sub is a message queue cloud service of Google. There are several ways to work with Pub/Sub;

Testing against Google Cloud Platform

One is specifying the project ID and the path to the service account JSON file in your Mockintosh config:

type: gpubsub
address: project-id-111111@/path/to/project-id-111111.json

The other way is setting the environment variables GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT while setting service type to gpubsub:

$ GOOGLE_APPLICATION_CREDENTIALS="/path/to/project-id-111111.json" \
    GOOGLE_CLOUD_PROJECT="project-id-111111" \
    mockintosh config.yaml

Testing against Pub/Sub emulator

Google provides an emulator for Pub/Sub and can be installed by following this official tutorial. Alternatively, there are community Docker images like messagebird/gcloud-pubsub-emulator which can be used instead of installing the emulator locally.

Once the Pub/Sub emulator is up and running, you can start Mockintosh by setting PUBSUB_EMULATOR_HOST and PUBSUB_PROJECT_ID environment variables while setting service type to gpubsub:

$ PUBSUB_EMULATOR_HOST="localhost:8681" \
    PUBSUB_PROJECT_ID="project-id" \
    mockintosh config.yaml

Amazon Simple Queue Service

Note: Reinstall Mockintosh with pip3 install mockintosh[cloud] to have the boto3 optional Python package dependency.

Amazon Simple Queue Service is a message queue cloud service of Amazon. There are several ways to work with Amazon SQS;

Testing against AWS

One is specifying the AWS credentials, a legacy endpoint and the region as a URI format <SCHEME>://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@<ENDPOINT>:<PORT>#<REGION> in the address field:

The <SCHEME>/<PORT> combination can be one of these:

<SCHEME> <PORT> Description
http 80 use_ssl is false
https 443 use_ssl is true
type: amazonsqs
address: https://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@us-east-2.queue.amazonaws.com:443#us-east-2

The <AWS_ACCESS_KEY_ID> and <AWS_SECRET_ACCESS_KEY> parts of the address are omitted if the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set.

$ AWS_ACCESS_KEY_ID="<AWS_ACCESS_KEY_ID>" \
    AWS_SECRET_ACCESS_KEY="<AWS_SECRET_ACCESS_KEY>" \
    mockintosh config.yaml

Testing against ElasticMQ

ElasticMQ is an in-memory message queue with an Amazon SQS-compatible interface. Such that it provides a way to test Amazon SQS integration behavior locally.

You can directly run ElasticMQ using Java java -jar elasticmq-server-X.Y.Z.jar or you can use the softwaremill/elasticmq Docker image.

Use the address below to establish a connection to local ElasticMQ instance:

type: amazonsqs
address: http://localhost:9324#elasticmq

The Message Queuing Telemetry Transport (MQTT)

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). Some of the most notable message brokers that supports MQTT are:

To be able to work with MQTT, these two fields should be specified in a service:

type: mqtt
address: localhost:1883

The mqtt value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match to the MQTT target hostname/IP and port.