Skip to content

How to Subscribe to Notifications

In What are Notifications in oneM2M? we discussed that a Notification is a request that is sent by a CSE to a registered application when a resource is added, updated, or removed.

In this article we want to show how to subscribe to notifications. For this we will create a resource structure and subscribe to events that are raised when <contentInstance> resources are created under a <container> resource.

Preparation

We use Python 3 for the code snippets in this article. Please see Preparations and Prerequisites for further instructions to install the necessary packages.

We assume that a local CSE runs on the local machine and can receive requests via http at localhost port 8080.

Subscribing to Notifications

First, we need to register an AE. Please see How to Register an AE for further information. Under the <AE> resource we create a <container> resource that we will use as the resource that will be monitored for changes. We then create a subscription under the <container> resource. This resource will be used to configure the resource monitoring and to specify the notification target.

In the following sections we will explain the subscription code in the file main.py step-by-step.

Main script
# Import the setup variables
from setup import *                                     

# Import AE functions
from ae import register_AE, unregister_AE

# Import container function
from container import create_container

# Import subscription function
from subscription import create_subscription 

# Import contentInstance function
from contentInstance import create_contentInstance 

# Import notification function
from notificationReceiver import run_notification_receiver, stop_notification_receiver

# Start the notification server first
run_notification_receiver()

# Register an AE
if register_AE(application_name) == False:
    stop_notification_receiver()
    exit()

# Create a <container> resource under the <AE> resource
if create_container(application_name, application_path, container_name) == False:
    unregister_AE(application_name)
    stop_notification_receiver()
    exit()

# Create a <subscription> resource under the <container> resource
if create_subscription(application_name, container_path, subscription_name, notificationURIs) == False:
    unregister_AE(application_name)
    stop_notification_receiver()
    exit()

# Create a <contentInstance> resource under the <container> resource
if create_contentInstance(application_name, container_path, 'Hello World!') == False:
    unregister_AE(application_name)
    stop_notification_receiver()
    exit()

# Unregister the AE and stop the notification server
unregister_AE(application_name)
stop_notification_receiver()
Setup variables and support functions
import random, string

# Setup variables
cse_url = 'http://localhost:8080/~/id-in/cse-in'                  # The url of the CSE
notificationURIs = ['http://localhost:7070']                # The notification target
application_name = 'CMyApplication'                         # The name of the application entity
application_path = cse_url + '/' + application_name         # The path of the application entity
container_name = 'myContainer'                              # The name of the container
container_path = application_path + '/' + container_name    # The path of the container
subscription_name = 'mySubscription'                        # The name of the subscription


def randomID() -> str:
    """ Generate an ID. Prevent certain patterns in the ID.

        Return:
            String with a random ID
    """
    return ''.join(random.choices(string.ascii_uppercase + string.digits + string.ascii_lowercase, k = 10))
Functions for working with <AE> resources
from setup import cse_url, randomID
import requests

def register_AE(originator:str) -> bool:
    """ Register an Application Entity

        Args:
            originator: The originator of the request

        Returns:
            bool: True if the AE was registered successfully, False otherwise
    """

    # Set the oneM2M headers for creating the <AE> resource
    headers = {
        'Content-Type': 'application/json;ty=2',    # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <AE> resource
    body = {
        'm2m:ae': {
            'rn': 'CMyApplication',
            'api': 'Nmy-application.example.com',
            'rr': True,
            'srv': ['4']
        }
    }

    # Perform the http request to create the <AE> resource
    response = requests.post(cse_url, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('AE created successfully')
    else:
        print('Error creating AE: ' + str(response.status_code))
        return False

    return True


# Unregister AE
def unregister_AE(application_name:str) -> bool:
    """ Unregister an Application Entity

        Args:
            originator: The originator of the request

        Returns:
            bool: True if the AE was unregistered successfully, False otherwise
    """

    # Set the oneM2M headers for deleting the <AE> resource
    headers = {
        'X-M2M-Origin': application_name,           # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Perform the http request to delete the <AE> resource
    response = requests.delete(cse_url + '/' + application_name, headers=headers)

    # Check the response
    if response.status_code == 200:
        print('AE deleted successfully')
    else:
        print('Error deleting AE: ' + str(response.status_code))
        return False

    return True
Functions for working with <container> resources
import requests
from setup import randomID

def create_container(originator:str, path:str, rn:str) -> bool:
    """ Create a <container> resource

        Args:
            originator: The originator of the request
            path: The path of the parent resource
            rn: The resource name of the <container> resource

        Returns:
            bool: True if the <container> resource was created successfully, False otherwise
    """
    # Set the oneM2M headers for creating the <container> resource
    headers = {
        'Content-Type': 'application/json;ty=3',    # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <container> resource
    body = {
        'm2m:cnt': {
            'rn': rn
        }
    }

    # Perform the http request to create the <container> resource
    response = requests.post(path, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('Container created successfully')
    else:
        print('Error creating container: ' + str(response.status_code))
        return False

    return True
Functions for working with <subscription> resources
import requests
from setup import randomID

def create_subscription(originator:str, path:str, rn:str, notificationURIs:list[str]) -> bool:
    """ Create a <subscription> resource

        Args:
            originator: The originator of the request
            path: The path of the parent resource
            rn: The resource name of the <subscription> resource

        Returns:
            bool: True if the <subscription> resource was created successfully, False otherwise
    """
    # Set the oneM2M headers for creating the <subscription> resource
    headers = {
        'Content-Type': 'application/json;ty=23',   # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <subscription> resource
    body = {
        'm2m:sub': {
            'rn': rn,
            'enc': {
                'net': [3]
            },
            'nu': notificationURIs
        }
    }

    # Perform the http request to create the <subscription> resource
    response = requests.post(path, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('Subscription created successfully')
    else:
        print('Error creating subscription: ' + str(response.status_code))
        return False

    return True
Functions for working with <contentInstance> resources
import requests
from setup import randomID

def create_contentInstance(originator:str, path:str, content:str) -> bool:
    """ Create a <container> resource

        Args:
            originator: The originator of the request
            path: The path of the parent resource
            content: The content of the <contentInstance> resource

        Returns:
            bool: True if the <contentInstance> resource was created successfully, False otherwise
    """
    # Set the oneM2M headers for creating the <container> resource
    headers = {
        'Content-Type': 'application/json;ty=4',    # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <container> resource
    body = {
        'm2m:cin': {
            'con': content
        }
    }

    # Perform the http request to create the <container> resource
    response = requests.post(path, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('Container created successfully')
    else:
        print('Error creating container: ' + str(response.status_code))
        return False

    return True
Basic Notification Receiver Implementation
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
import json

notification_receiver = None

class NotificationReceiver(BaseHTTPRequestHandler):
    """ The notification handler class. 
        This class handles the HTTP requests sent by the CSE to the notification receiver.
    """
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        request_id = self.headers['X-M2M-RI']
        post_data = self.rfile.read(content_length)
        data = json.loads(post_data)
        if data['m2m:sgn'].get('vrq'):
            print('<= Verification notification request received')
        else:
            print('<= Subscription notification request received')
        print(f'<= {data}')

        self.send_response(200)
        self.send_header('X-M2M-RSC', '2000')
        self.send_header('X-M2M-RI', request_id)

        self.end_headers() 


    def log_message(self, format:str, *args:int) -> None:
        # Ignore log messages
        pass

def run_notification_receiver(port=7070) -> None:
    """ This function starts the notification receiver on the specified port.
        The notification receiver will run in a separate thread.

        Args:
            handler_class: The HTTP request handler class
            port: The port on which the notification server will run
    """
    global notification_receiver
    server_address = ('', port)
    notification_receiver = HTTPServer(server_address, NotificationReceiver)
    print(f'Starting notification receiver on port {port}...')
    Thread(target=notification_receiver.serve_forever).start()


def stop_notification_receiver() -> None:
    """ Stop the notification receiver.
    """
    global notification_receiver
    if notification_receiver:
        notification_receiver.shutdown()
        notification_receiver = None
        print('Notification receiver stopped')

Start the Notification Receiver

The first step in the script is to start the notification receiver. This receiver is a simple HTTP server that listens on port 7070. It is implemented in the file notificationServer.py. The notification server is started in the main script main.py by calling the run_notification_server() function.

A notification receiver is a part of an AE that is able to receive notifications from a CSE. The notification receiver is implemented as an HTTP server that listens on a specific port.

Register an Application Entity

The first step is to register an application entity. Please see How to Register an AE for further information and a detailed explanation of the code.

The register_AE() function in the file aes.py registers an application entity with the name CMyApplication (provided as a function argument) and the application identifier Nmy-application.example.com. The function returns True if the registration was successful, False otherwise.

Create a <container> resource

The next step is to create a <container> resource under the <AE> resource. It will be used as the resource that will be monitored for changes. The creation of a <container> resource is not much different from the creation of an <AE> resource.

The create_container() function defined in the file containers.py creates a <container> resource with the name myContainer (provided as a function argument) under the <AE> resource. The function returns True if the creation was successful, False otherwise.

Our resource tree looks like this:

CSE
└───CMyApplication
    └───myContainer

Create a <subscription> resource

The next step is to create a <subscription> resource under the <container> resource. This resource will be used to configure the resource monitoring and to specify the notification target.

The create_subscription() function defined in the file subscriptions.py creates a <subscription> resource with the name mySubscription (provided as a function argument) under the <container> resource. The function returns True if the creation was successful, False otherwise.

Our resource tree now looks like this:

CSE
└───CMyApplication
    └───myContainer
        └───mySubscription

When creating the <subscription> resource we also specify the notification target. In our case we use the notification server that we started in the beginning of the script. During the creation of the <subscription> resource a verification notification request is sent to the notification target. This is a simple request that is used to verify that the notification target is reachable. The notification target must respond with a http status code of 200 and a oneM2M response status code of 2000. The notification target must also echo the request identifier that was sent in the request header field X-M2M-RI.

Create a <contentInstance> resource

The last step is to create a <contentInstance> resource under the <container> resource. This will trigger a notification that will be sent to the notification target. The notification receiver will receive the notification, print it to the console, and then send a response back to the CSE.

The resource tree finally looks like this:

CSE
└───CMyApplication
    └───myContainer
        └───mySubscription
        └───contentInstance

Output

The final output when successfully running the script looks like this:

Example Console Output
Starting notification receiver on port 7070...
AE created successfully
Container created successfully
<= Verification notification request received
<= {'m2m:sgn': {'vrq': True, 'sur': '/id-in/sub2711006235793759306', 'cr': 'CMyApplication'}}
Subscription created successfully
<= Subscription notification request received
<= {'m2m:sgn': {'nev': {'rep': {'m2m:cin': {'con': 'Hello World!', 'ri': 'cin7124901579521707503', 'pi': 'cnt8702778900039637980', 'rn': 'cin_UKmyMwKBXt', 'ct': '20231207T061300,532674', 'lt': '20231207T061300,532674', 'ty': 4, 'cs': 12, 'st': 1, 'et': '20281205T061300,414361'}}, 'net': 3}, 'sur': '/id-in/sub2711006235793759306'}}
Container created successfully
AE deleted successfully
Notification receiver stopped

Cleaning up

At the end of the script or when an error occurs we unregister the AE and stop the notification server.

Summary

In this article we showed how to subscribe to notifications. We created a resource structure for an AE and subscribed to events that are raised when <contentInstance> resources are created under a <container> resource. We also showed how to implement a simple notification receiver that receives notifications from a CSE.


by Andreas Kraft, 2023-12-07