Mocks for microservice environments
Mockintosh offers “Mock Actor” approach for using with asynchronous message bus technologies, such as Apache Kafka, RabbitMQ, Apache ActiveMQ, Redis etc. or cloud services such as Google Cloud Pub/Sub, Amazon SQS etc. See supported backends. “Mock Actor” approach requires you to provide deployed message bus instance, and configure valid address for it inside configuration file.
Each Mock Actor is an entity that can be configured for producing or consuming messages or both. Management API/UI provides important synergies to work with Mock Actors.
To start creating Mock Actors in Mockintosh config, you need to configure a service, with type option set
into kafka:
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092 # broker string of pre-existing Kafka instance
ssl: true
actors: [ ] # here we will configure the actors, see below
If you’re connecting to a Kafka cluster through SSL you need to set ssl field to true. By default SSL is disabled.
Note: The
addressfield of asynchronous services supports templating. Such that the address can be fetched from an environment variable like:address: "{{env 'KAFKA' 'localhost:9092'}}"
Below are the configuration patterns for Mock Actors:
Below is the configuration snippet for Mock Actor that will produce configured message each delay seconds, up
to limit times. The delay option is key for this case, it distinguishes scheduled producer
from “on-demand producer”.
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: scheduled-producer-1 # just the name
produce:
queue: scheduled-queue1 # topic/queue name
key: "message key, can be null"
value: "message value"
headers:
constant: justvalue
timestamp: '{{date.timestamp}}' # regular Mockintosh templating can be used
delay: 5 # seconds between producing
limit: 100 # limit of how many messages to produce, optional
You can use most of Mockintosh templating equations, with exception of those dependant on request.
Queue name and header names cannot be templated.
Note: Be careful with setting delay to low value with no limit option, this may run your message bus out of
memory/disk space.
On-demand producer is basically a scheduled producer with no delay option. Instead of producing messages
automatically, this kind of actor waits for Management API call to trigger the message push.
management:
port: 8000
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: on-demand-1
produce:
create: true
queue: on-demand1
key: somekey or null
value: "@value/from/file.json" # it's possible to reference file
Now, to trigger producing the message on-demand, you need to issue an API call using actor’s name, like this:
curl -X POST http://localhost:8000/async/producers/on-demand-1
and the response of this request would be;
{
"type": "kafka",
"name": "on-demand-1",
"index": 0,
"queue": "on-demand1",
"lastProduced": 1618695168.6416173
}
Note:
create: trueflag enables the topic creation if the given topic is not created yet.
It’s also possible to select the producer using its index number as an alternative to the
actor’s name like:
curl -X POST http://localhost:8000/async/producers/0
Note: The limit option actually works for any kind of producer.
It’s possible trigger an asynchronous producer from an HTTP endpoint using the triggerAsyncProducer field.
The value of this field can be either a possitive integer that indicates the global index of a producer or
a string that selects the producer based on its name:
management:
port: 8000
services:
- name: Mock for Service1
port: 8001
endpoints:
- path: "/endp1"
response:
body: "endp1"
triggerAsyncProducer: 0
- path: "/endp2"
response:
body: "endp2"
triggerAsyncProducer: on-demand-1
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: on-demand-1
produce:
create: true
queue: on-demand1
key: somekey or null
value: "@value/from/file.json" # it's possible to reference file
The producer on-demand-1 that’s linked to the HTTP endpoints via the triggerAsyncProducer field is triggered
whenever the an HTTP request is matched into the subject endpoint and the related response is returned.
The “validating consumer” actor is used when you need to check the fact of service publishing the message on the bus. For example, if your service accepts REST call and puts message on the bus, and you validating this behavior in an automated test. Again, for this validating you would need to have Management API enabled. Let’s see a snippet:
management:
port: 8000
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: validate-consume-1
consume:
queue: another-queue-name # topic/queue name to subscribe to, required
group: "consumer-group" # optional consumer group
key: matching keys # expected key, optional
value: "expected value" # optional
capture: 10 # limit len of messages to store for validation, optional, default: 1
To validate that message has appeared on the bus, you have to query Management API endpoint, like this:
curl http://localhost:8000/async/consumers/validate-consume-1
That would respond with a JSON containing the list of captured messages in
the HAR format, that’s quite similar to the responses you can see
in Traffic Logs. The traffic logging is specific to the selected consumer. The consumer
will store last N messages, according to its capture setting.
To clear the captured message list, issue a DELETE call on the same URL:
curl -X DELETE http://localhost:8000/async/consumers/validate-consume-1
To narrow down the expected message, you can use regular matching equations in key, value
or headers values:
management:
port: 8000
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: validate-consume-2
consume:
queue: another-queue-name
key: "{{regEx 'prefix-(.*)'}}"
value: "expected prefix-{{justName}}" # see also "reactive producer" section
headers:
hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}" # see also "reactive producer" section
The value field supports JSON Schema matching much like how
it’s in HTTP. Except it’s the schema field used instead of or with the value field:
- consume:
queue: validate-consume-3
key: "{{regEx 'prefix-(.*)'}}"
schema:
type: object
properties:
somekey: { }
required:
- somekey
headers:
hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}"
If the schema field used together with the value then both are taken into account as the criteria for matching.
Referencing JSONSchema from an external file is also supported:
- consume:
queue: validate-consume-3
key: "{{regEx 'prefix-(.*)'}}"
schema: "@path/to/schema.json"
headers:
hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}"
By mixing together actors of “validating consumer” and “on-demand producer” types, we can get the behavior when message
is produced in “reaction” to another message consumed from the bus. You can also specify a delay between consumption
and producing, to simulate some “processing time”.
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: reactive-producer-1
consume:
queue: consume-from-topic-1
key: "{{regEx 'prefix-(.*)'}}"
value: "expected prefix-{{justName}}" # see also "reactive producer" section
headers:
hdr-name: "{{regEx 'prefix-(.+)-suffix' 'myCapturedVar'}}" # see also "reactive producer" section
delay: 5 # optional delay before producing
produce:
queue: produce-into-topic-2
key: "can reference as {{consumed.key}} and {{consumed.value}}"
value: "reference from consumed: {{justName}} {{myCapturedVar}}"
headers:
propagated-hdr: '{{consumed.headers.hdr-name}}'
Note: Validating the consumer and triggering the producing would work for “reactive producer”, too.
For a configuration file like:
management:
port: 8000
services:
- name: Kafka Mock Actors
type: kafka
address: localhost:9092
actors:
- name: on-demand-1
produce:
queue: on-demand1
key: somekey or null
value: "@value/from/file.json"
- name: validate-consume-1
consume:
queue: another-queue-name
group: "consumer-group"
key: matching keys
value: "expected value"
capture: 10
the /async management endpoint, returns an index that contains producers and consumers lists:
$ curl http://localhost:8000/async
{
"producers": [
{
"type": "kafka",
"name": "on-demand-1",
"index": 0,
"queue": "on-demand1",
"producedMessages": 0,
"lastProduced": null
}
],
"consumers": [
{
"type": "kafka",
"name": "validate-consume-1",
"index": 0,
"queue": "another-queue-name",
"captured": 0,
"consumedMessages": 0,
"lastConsumed": null
}
]
}
captured field resembles the number of consumed messages that stored in the buffer while consumedMessages field
resembles the number of all consumed messages throught the message stream.
Similar to the multiple responses in HTTP, asynchronous producers support tagged
payloads. Which means they can be a list of queue, key, value, headers combinations (JSON array)
instead of being a single combination (JSON object):
produce:
- queue: topicA
key: keyA-1
value: valueA-1
headers:
hdrA-1: valA-1
- queue: topicA
key: keyA-2
value: valueA-2
headers:
hdrA-2: valA-2
valueA-1 is produced as value.valueA-2 is produced as value.valueA-1 is produced as value.By default a producer loops through its payloads indefinitely for each trigger. To disable this behavior, set
multiPayloadsLooped to false similar to multiResponsesLooped in HTTP.
Note: Supplying different topics for multiple payloads throws a compile-time error.
Similar to the tagged responses in HTTP, it’s possible to select certain payload or
payloads using the tag field:
produce:
- queue: topicA
key: keyA-1
value: valueA-1
headers:
hdrA-1: valA-1
- queue: topicA
tag: async-tagA-3
key: keyA-3
value: valueA-3
headers:
hdrA-3: valA-3
- queue: topicA
key: keyA-2
value: valueA-2
headers:
hdrA-2: valA-2
“Tags” is a generic feature so Setting Current Tag and Resetting Iterators are valid for asynchronous tags too.
Similar to the datasets in HTTP, one can put a dataset field under actor
to specify a list of key-value combinations to inject into response templating.
This field can be a string that starts with @ to indicate a path to an external JSON file like @subdir/dataset.json
or an array:
dataset:
- var1: val1
- var1: val2
produce:
- queue: topic1
key: key1
value: "dset: {{var1}}"
dset: val1 is produced as value.dset: val2 is produced as value.dset: val1 is produced as value.By default a producer loops through the given dataset indefinitely for each trigger. To disable this behavior, set
datasetLooped to false similar to datasetLooped in HTTP.
Mockintosh supports three different asynchronous backends; Apache Kafka, AMQP and Redis.
To be able to work with Apache Kafka, these two fields should be specified in a service:
type: kafka
address: localhost:9092
The kafka value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match
to the Apache Kafka instance’s hostname/IP and port.
AMQP as an OASIS standard is a widely accepted protocol accross the message broker software such as;
and cloud services such as;
To be able to work with AMQP, these two fields should be specified in a service:
type: amqp
address: localhost:5672
The amqp value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match
to the AMQP target hostname/IP and port.
Note: rabbitmq and activemq as a value for the type field instead of amqp is also accepted.
It’s possible to supply an additional amqpProperties field to fill the keyword arguments
in pika.spec.BasicProperties
except the headers argument. headers argument is set by just the common headers field.
So for example, to mock the convertAndSend function of org.springframework.amqp.rabbit.core.RabbitTemplate class you would simply use these
amqpProperties:
queue: queue1
value: '{"message":"OK"}'
amqpProperties:
priority: 0
delivery_mode: 2
content_encoding: "UTF-8"
content_type: "application/json"
headers:
__TypeId__: "com.example.ClassName"
Redis as an in-memory database, can also be used as a message queue. But note that;
key and headers fields are ignored for Redis type asynchronous services since Redis does not have
such concepts. Message queue functionality through Redis
is achived by PyRSMQ package.
To be able to work with Redis, these two fields should be specified in a service:
type: redis
address: localhost:6379
The redis value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match
to the Redis instance’s hostname/IP and port.
Note: Reinstall Mockintosh with pip3 install mockintosh[cloud] to have the google-cloud-pubsub optional Python package dependency.
Google Cloud Pub/Sub is a message queue cloud service of Google. There are several ways to work with Pub/Sub;
One is specifying the project ID and the path to the service account JSON file in your Mockintosh config:
type: gpubsub
address: project-id-111111@/path/to/project-id-111111.json
The other way is setting the environment variables GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT
while setting service type to gpubsub:
$ GOOGLE_APPLICATION_CREDENTIALS="/path/to/project-id-111111.json" \
GOOGLE_CLOUD_PROJECT="project-id-111111" \
mockintosh config.yaml
Google provides an emulator for Pub/Sub and can be installed by following this official tutorial. Alternatively, there are community Docker images like messagebird/gcloud-pubsub-emulator which can be used instead of installing the emulator locally.
Once the Pub/Sub emulator is up and running, you can start Mockintosh
by setting PUBSUB_EMULATOR_HOST and PUBSUB_PROJECT_ID environment variables while setting service type to gpubsub:
$ PUBSUB_EMULATOR_HOST="localhost:8681" \
PUBSUB_PROJECT_ID="project-id" \
mockintosh config.yaml
Note: Reinstall Mockintosh with pip3 install mockintosh[cloud] to have the boto3 optional Python package dependency.
Amazon Simple Queue Service is a message queue cloud service of Amazon. There are several ways to work with Amazon SQS;
One is specifying the AWS credentials,
a legacy endpoint
and the region as a URI format <SCHEME>://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@<ENDPOINT>:<PORT>#<REGION> in the address field:
The <SCHEME>/<PORT> combination can be one of these:
<SCHEME> |
<PORT> |
Description |
|---|---|---|
http |
80 |
use_ssl is false |
https |
443 |
use_ssl is true |
type: amazonsqs
address: https://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@us-east-2.queue.amazonaws.com:443#us-east-2
The <AWS_ACCESS_KEY_ID> and <AWS_SECRET_ACCESS_KEY> parts of the address are omitted if the
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables are set.
$ AWS_ACCESS_KEY_ID="<AWS_ACCESS_KEY_ID>" \
AWS_SECRET_ACCESS_KEY="<AWS_SECRET_ACCESS_KEY>" \
mockintosh config.yaml
ElasticMQ is an in-memory message queue with an Amazon SQS-compatible interface. Such that it provides a way to test Amazon SQS integration behavior locally.
You can directly run ElasticMQ using Java java -jar elasticmq-server-X.Y.Z.jar or you can use
the softwaremill/elasticmq Docker image.
Use the address below to establish a connection to local ElasticMQ instance:
type: amazonsqs
address: http://localhost:9324#elasticmq
MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). Some of the most notable message brokers that supports MQTT are:
To be able to work with MQTT, these two fields should be specified in a service:
type: mqtt
address: localhost:1883
The mqtt value for the type field is a keyword and <HOST>:<PORT> configuration in the address field should match
to the MQTT target hostname/IP and port.