Skip to content

How to Monitor Request Operations

In Monitoring Operations we discussed how oneM2M supports monitoring operations on resources. In this article we want to show a code example that demonstrates how to create a subscription for monitoring operations.

Preparation

We use Python 3 for the code snippets in this article. Please see Preparations and Prerequisites for further instructions to install the necessary packages.

Re-using the example code

The example code we will use in this article is very similar to the code in the article How to Subscribe to Notifications. We will use a simplified resource structure, but extend it with a subscription for monitoring operations. Please refer to that article for the setup and running the code.

We assume that a local CSE runs on the local machine and can receive requests via http at localhost port 8080.

Monitoring Operations

Resource Structure and Call-Flow

Figure 1 shows the resource structure that we will create in this example. We will create an <AE> resource with a subscription for monitoring operations. The subscription will monitor RETRIEVE operations on the <AE> resource.

CSE
└───CMyApplication
    └───mySubscription
Figure 1: Resource Structure

The call-flow for this recipe is shown in Figure 2:

Figure 2: Call-Flow

The Application entity is depicted as two participants in the call-flow: the Application and the Notification Receiver.
With the first two requests the Application creates the resources and the subscription for monitoring operations. The Notification Receiver receives the notifications from the CSE.
The third request retrieves the <AE> resource and thereby triggers a notification to the Notification Receiver. The fourth request deletes the <AE> resource and cleans up the resources.

Source Code

This code and the procedures are similar to the code in the article How to Subscribe to Notifications. The main difference is that in this example we create a subscription for monitoring operations with the respective attributes and directly under the <AE> resource.

Main script
# Import the setup variables
from setup import *                                     

# Import AE functions
from ae import register_AE, unregister_AE, retrieve_AE

# Import subscription function
from subscription import create_subscription 

# Import notification function
from notificationReceiver import run_notification_receiver, stop_notification_receiver

# Start the notification server first
run_notification_receiver()

# Register an AE
if register_AE(application_name) == False:
    stop_notification_receiver()
    exit()

# Create a <subscription> resource under the <container> resource
if create_subscription(application_name, application_path, subscription_name, notificationURIs) == False:
    unregister_AE(application_name)
    stop_notification_receiver()
    exit()

# Retrieve the <container> resource
if retrieve_AE(application_name, application_path) == False:
    unregister_AE(application_name)
    stop_notification_receiver()
    exit()

# Unregister the AE and stop the notification server
unregister_AE(application_name)
stop_notification_receiver()
Setup variables and support functions
import random, string

# Setup variables
cse_url = 'http://localhost:8080/~/id-in/cse-in'            # The url of the CSE
notificationURIs = ['http://localhost:7070']                # The notification target
application_name = 'CMyApplication'                         # The name of the application entity
application_path = cse_url + '/' + application_name         # The path of the application entity
subscription_name = 'mySubscription'                        # The name of the subscription


def randomID() -> str:
    """ Generate an ID. Prevent certain patterns in the ID.

        Return:
            String with a random ID
    """
    return ''.join(random.choices(string.ascii_uppercase + string.digits + string.ascii_lowercase, k = 10))
Functions for working with <AE> resources
from setup import cse_url, randomID
import requests

def register_AE(originator:str) -> bool:
    """ Register an Application Entity

        Args:
            originator: The originator of the request

        Returns:
            bool: True if the AE was registered successfully, False otherwise
    """

    # Set the oneM2M headers for creating the <AE> resource
    headers = {
        'Content-Type': 'application/json;ty=2',    # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <AE> resource
    body = {
        'm2m:ae': {
            'rn': 'CMyApplication',
            'api': 'Nmy-application.example.com',
            'rr': True,
            'srv': ['4']
        }
    }

    # Perform the http request to create the <AE> resource
    response = requests.post(cse_url, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('AE created successfully')
    else:
        print('Error creating AE: ' + str(response.status_code))
        return False

    return True


# Unregister AE
def unregister_AE(application_name:str) -> bool:
    """ Unregister an Application Entity

        Args:
            originator: The originator of the request

        Returns:
            bool: True if the AE was unregistered successfully, False otherwise
    """

    # Set the oneM2M headers for deleting the <AE> resource
    headers = {
        'X-M2M-Origin': application_name,           # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Perform the http request to delete the <AE> resource
    response = requests.delete(cse_url + '/' + application_name, headers=headers)

    # Check the response
    if response.status_code == 200:
        print('AE deleted successfully')
    else:
        print('Error deleting AE: ' + str(response.status_code))
        return False

    return True


def retrieve_AE(originator:str, path:str) -> bool:
    """ Retrieve an Application Entity

        Args:
            originator: The originator of the request
            path: The path of the <AE> resource
    """
    # Set the oneM2M headers for retrieving the <AE> resource
    headers = {
        'Content-Type': 'application/json',         # Encoding
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    response = requests.get(path, headers=headers)

    # Check the response
    if response.status_code == 200:
        print('AE retrieved successfully')
    else:
        print('Error creating container: ' + str(response.status_code))
        return False

    return True
Functions for working with <subscription> resources
import requests
from setup import randomID

def create_subscription(originator:str, path:str, rn:str, notificationURIs:list[str]) -> bool:
    """ Create a <subscription> resource

        Args:
            originator: The originator of the request
            path: The path of the parent resource
            rn: The resource name of the <subscription> resource

        Returns:
            bool: True if the <subscription> resource was created successfully, False otherwise
    """
    # Set the oneM2M headers for creating the <subscription> resource
    headers = {
        'Content-Type': 'application/json;ty=23',   # Type of the resource to be created
        'X-M2M-Origin': originator,                 # unique application entity identifier
        'X-M2M-RI': randomID(),                     # unique request identifier
        'X-M2M-RVI': '4' 
    }

    # Define the <subscription> resource
    body = {
        'm2m:sub': {
            'rn': rn,
            'enc': {
                'om': [ {                           # Enable operation monitoring
                    'ops' : 2,                      # Monitor RETRIEVE operations
                    'org': originator               # Originator of the operation
                } ],
            },
            'nct': 3,
            'nu': notificationURIs
        }
    }

    # Perform the http request to create the <subscription> resource
    response = requests.post(path, headers=headers, json=body)

    # Check the response
    if response.status_code == 201:
        print('Subscription created successfully')
    else:
        print('Error creating subscription: ' + str(response.status_code))
        return False

    return True
Basic Notification Receiver Implementation
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
import json

notification_receiver = None

class NotificationReceiver(BaseHTTPRequestHandler):
    """ The notification handler class. 
        This class handles the HTTP requests sent by the CSE to the notification receiver.
    """
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        request_id = self.headers['X-M2M-RI']
        post_data = self.rfile.read(content_length)
        data = json.loads(post_data)
        if data['m2m:sgn'].get('vrq'):
            print('<= Verification notification request received')
        else:
            print('<= Subscription notification request received')
        print(f'<= {data}')

        self.send_response(200)
        self.send_header('X-M2M-RSC', '2000')
        self.send_header('X-M2M-RI', request_id)

        self.end_headers() 


    def log_message(self, format:str, *args:int) -> None:
        # Ignore log messages
        pass

def run_notification_receiver(port=7070) -> None:
    """ This function starts the notification receiver on the specified port.
        The notification receiver will run in a separate thread.

        Args:
            handler_class: The HTTP request handler class
            port: The port on which the notification server will run
    """
    global notification_receiver
    server_address = ('', port)
    notification_receiver = HTTPServer(server_address, NotificationReceiver)
    print(f'Starting notification receiver on port {port}')
    Thread(target=notification_receiver.serve_forever).start()


def stop_notification_receiver() -> None:
    """ Stop the notification receiver.
    """
    global notification_receiver
    if notification_receiver:
        notification_receiver.shutdown()
        notification_receiver = None
        print('Notification receiver stopped')

Output

The final output when successfully running the script looks like this:

Example Console Output
Starting notification receiver on port 7070
AE created successfully
<= Verification notification request received
<= {'m2m:sgn': {'vrq': True, 'sur': '/id-in/sub977581022253637803', 'cr': 'CMyApplication'}}
Subscription created successfully
AE retrieved successfully
<= Subscription notification request received
<= {'m2m:sgn': {'nev': {'net': 0, 'rep': {'m2m:uri': 'CMyApplication'}, 'om': {'ops': 2, 'org': 'CMyApplication'}}, 'sur': '/id-in/sub977581022253637803'}}
AE deleted successfully
Notification receiver stopped

Summary

In this article we showed how to create a subscription for monitoring operations on resources. We used a Python script to create an <AE> resource with a subscription for monitoring RETRIEVE operations. The script also launches a notification receiver that receives the notifications from the CSE.


by Andreas Kraft, 2025-01-07