In What are Notifications in oneM2M? we discussed that a Notification is a request that is sent by a CSE to a registered application when a resource is added, updated, or removed.
In this article we want to show how to subscribe to notifications. For this we will create a resource structure and subscribe to events that are raised when <contentInstance> resources are created under a <container> resource.
Preparation
We use Python 3 for the code snippets in this article. Please see Preparations and Prerequisites for further instructions to install the necessary packages.
We assume that a local CSE runs on the local machine and can receive requests via http at localhost port 8080.
Subscribing to Notifications
First, we need to register an AE. Please see How to Register an AE for further information. Under the <AE> resource we create a <container> resource that we will use as the resource that will be monitored for changes. We then create a subscription under the <container> resource. This resource will be used to configure the resource monitoring and to specify the notification target.
In the following sections we will explain the subscription code in the file main.py step-by-step.
# Import the setup variablesfromsetupimport*# Import AE functionsfromaeimportregister_AE,unregister_AE# Import container functionfromcontainerimportcreate_container# Import subscription functionfromsubscriptionimportcreate_subscription# Import contentInstance functionfromcontentInstanceimportcreate_contentInstance# Import notification functionfromnotificationReceiverimportrun_notification_receiver,stop_notification_receiver# Start the notification server firstrun_notification_receiver()# Register an AEifregister_AE(application_name)==False:stop_notification_receiver()exit()# Create a <container> resource under the <AE> resourceifcreate_container(application_name,application_path,container_name)==False:unregister_AE(application_name)stop_notification_receiver()exit()# Create a <subscription> resource under the <container> resourceifcreate_subscription(application_name,container_path,subscription_name,notificationURIs)==False:unregister_AE(application_name)stop_notification_receiver()exit()# Create a <contentInstance> resource under the <container> resourceifcreate_contentInstance(application_name,container_path,'Hello World!')==False:unregister_AE(application_name)stop_notification_receiver()exit()# Unregister the AE and stop the notification serverunregister_AE(application_name)stop_notification_receiver()
importrandom,string# Setup variablescse_url='http://localhost:8080/~/id-in/cse-in'# The url of the CSEnotificationURIs=['http://localhost:7070']# The notification targetapplication_name='CMyApplication'# The name of the application entityapplication_path=cse_url+'/'+application_name# The path of the application entitycontainer_name='myContainer'# The name of the containercontainer_path=application_path+'/'+container_name# The path of the containersubscription_name='mySubscription'# The name of the subscriptiondefrandomID()->str:""" Generate an ID. Prevent certain patterns in the ID. Return: String with a random ID """return''.join(random.choices(string.ascii_uppercase+string.digits+string.ascii_lowercase,k=10))
fromsetupimportcse_url,randomIDimportrequestsdefregister_AE(originator:str)->bool:""" Register an Application Entity Args: originator: The originator of the request Returns: bool: True if the AE was registered successfully, False otherwise """# Set the oneM2M headers for creating the <AE> resourceheaders={'Content-Type':'application/json;ty=2',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <AE> resourcebody={'m2m:ae':{'rn':'CMyApplication','api':'Nmy-application.example.com','rr':True,'srv':['4']}}# Perform the http request to create the <AE> resourceresponse=requests.post(cse_url,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('AE created successfully')else:print('Error creating AE: '+str(response.status_code))returnFalsereturnTrue# Unregister AEdefunregister_AE(application_name:str)->bool:""" Unregister an Application Entity Args: originator: The originator of the request Returns: bool: True if the AE was unregistered successfully, False otherwise """# Set the oneM2M headers for deleting the <AE> resourceheaders={'X-M2M-Origin':application_name,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Perform the http request to delete the <AE> resourceresponse=requests.delete(cse_url+'/'+application_name,headers=headers)# Check the responseifresponse.status_code==200:print('AE deleted successfully')else:print('Error deleting AE: '+str(response.status_code))returnFalsereturnTrue
importrequestsfromsetupimportrandomIDdefcreate_container(originator:str,path:str,rn:str)->bool:""" Create a <container> resource Args: originator: The originator of the request path: The path of the parent resource rn: The resource name of the <container> resource Returns: bool: True if the <container> resource was created successfully, False otherwise """# Set the oneM2M headers for creating the <container> resourceheaders={'Content-Type':'application/json;ty=3',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <container> resourcebody={'m2m:cnt':{'rn':rn}}# Perform the http request to create the <container> resourceresponse=requests.post(path,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('Container created successfully')else:print('Error creating container: '+str(response.status_code))returnFalsereturnTrue
Functions for working with <subscription> resources
importrequestsfromsetupimportrandomIDdefcreate_subscription(originator:str,path:str,rn:str,notificationURIs:list[str])->bool:""" Create a <subscription> resource Args: originator: The originator of the request path: The path of the parent resource rn: The resource name of the <subscription> resource Returns: bool: True if the <subscription> resource was created successfully, False otherwise """# Set the oneM2M headers for creating the <subscription> resourceheaders={'Content-Type':'application/json;ty=23',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <subscription> resourcebody={'m2m:sub':{'rn':rn,'enc':{'net':[3]},'nu':notificationURIs}}# Perform the http request to create the <subscription> resourceresponse=requests.post(path,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('Subscription created successfully')else:print('Error creating subscription: '+str(response.status_code))returnFalsereturnTrue
Functions for working with <contentInstance> resources
importrequestsfromsetupimportrandomIDdefcreate_contentInstance(originator:str,path:str,content:str)->bool:""" Create a <container> resource Args: originator: The originator of the request path: The path of the parent resource content: The content of the <contentInstance> resource Returns: bool: True if the <contentInstance> resource was created successfully, False otherwise """# Set the oneM2M headers for creating the <container> resourceheaders={'Content-Type':'application/json;ty=4',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <container> resourcebody={'m2m:cin':{'con':content}}# Perform the http request to create the <container> resourceresponse=requests.post(path,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('Container created successfully')else:print('Error creating container: '+str(response.status_code))returnFalsereturnTrue
fromhttp.serverimportBaseHTTPRequestHandler,HTTPServerfromthreadingimportThreadimportjsonnotification_receiver=NoneclassNotificationReceiver(BaseHTTPRequestHandler):""" The notification handler class. This class handles the HTTP requests sent by the CSE to the notification receiver. """defdo_POST(self):content_length=int(self.headers['Content-Length'])request_id=self.headers['X-M2M-RI']post_data=self.rfile.read(content_length)data=json.loads(post_data)ifdata['m2m:sgn'].get('vrq'):print('<= Verification notification request received')else:print('<= Subscription notification request received')print(f'<= {data}')self.send_response(200)self.send_header('X-M2M-RSC','2000')self.send_header('X-M2M-RI',request_id)self.end_headers()deflog_message(self,format:str,*args:int)->None:# Ignore log messagespassdefrun_notification_receiver(port=7070)->None:""" This function starts the notification receiver on the specified port. The notification receiver will run in a separate thread. Args: handler_class: The HTTP request handler class port: The port on which the notification server will run """globalnotification_receiverserver_address=('',port)notification_receiver=HTTPServer(server_address,NotificationReceiver)print(f'Starting notification receiver on port {port}...')Thread(target=notification_receiver.serve_forever).start()defstop_notification_receiver()->None:""" Stop the notification receiver. """globalnotification_receiverifnotification_receiver:notification_receiver.shutdown()notification_receiver=Noneprint('Notification receiver stopped')
Start the Notification Receiver
The first step in the script is to start the notification receiver. This receiver is a simple HTTP server that listens on port 7070. It is implemented in the file notificationServer.py. The notification server is started in the main script main.py by calling the run_notification_server() function.
A notification receiver is a part of an AE that is able to receive notifications from a CSE. The notification receiver is implemented as an HTTP server that listens on a specific port.
Register an Application Entity
The first step is to register an application entity. Please see How to Register an AE for further information and a detailed explanation of the code.
The register_AE() function in the file aes.py registers an application entity with the name CMyApplication (provided as a function argument) and the application identifier Nmy-application.example.com. The function returns True if the registration was successful, False otherwise.
Create a <container> resource
The next step is to create a <container> resource under the <AE> resource. It will be used as the resource that will be monitored for changes. The creation of a <container> resource is not much different from the creation of an <AE> resource.
The create_container() function defined in the file containers.py creates a <container> resource with the name myContainer (provided as a function argument) under the <AE> resource. The function returns True if the creation was successful, False otherwise.
Our resource tree looks like this:
CSE
└───CMyApplication
└───myContainer
Create a <subscription> resource
The next step is to create a <subscription> resource under the <container> resource. This resource will be used to configure the resource monitoring and to specify the notification target.
The create_subscription() function defined in the file subscriptions.py creates a <subscription> resource with the name mySubscription (provided as a function argument) under the <container> resource. The function returns True if the creation was successful, False otherwise.
When creating the <subscription> resource we also specify the notification target. In our case we use the notification server that we started in the beginning of the script. During the creation of the <subscription> resource a verification notification request is sent to the notification target. This is a simple request that is used to verify that the notification target is reachable. The notification target must respond with a http status code of 200 and a oneM2M response status code of 2000. The notification target must also echo the request identifier that was sent in the request header field X-M2M-RI.
Create a <contentInstance> resource
The last step is to create a <contentInstance> resource under the <container> resource. This will trigger a notification that will be sent to the notification target. The notification receiver will receive the notification, print it to the console, and then send a response back to the CSE.
The final output when successfully running the script looks like this:
Example Console Output
Starting notification receiver on port 7070...
AE created successfully
Container created successfully
<= Verification notification request received
<= {'m2m:sgn': {'vrq': True, 'sur': '/id-in/sub2711006235793759306', 'cr': 'CMyApplication'}}
Subscription created successfully
<= Subscription notification request received
<= {'m2m:sgn': {'nev': {'rep': {'m2m:cin': {'con': 'Hello World!', 'ri': 'cin7124901579521707503', 'pi': 'cnt8702778900039637980', 'rn': 'cin_UKmyMwKBXt', 'ct': '20231207T061300,532674', 'lt': '20231207T061300,532674', 'ty': 4, 'cs': 12, 'st': 1, 'et': '20281205T061300,414361'}}, 'net': 3}, 'sur': '/id-in/sub2711006235793759306'}}
Container created successfully
AE deleted successfully
Notification receiver stopped
Cleaning up
At the end of the script or when an error occurs we unregister the AE and stop the notification server.
Summary
In this article we showed how to subscribe to notifications. We created a resource structure for an AE and subscribed to events that are raised when <contentInstance> resources are created under a <container> resource. We also showed how to implement a simple notification receiver that receives notifications from a CSE.