In Monitoring Operations we discussed how oneM2M supports monitoring operations on resources. In this article we want to show a code example that demonstrates how to create a subscription for monitoring operations.
Preparation
We use Python 3 for the code snippets in this article. Please see Preparations and Prerequisites for further instructions to install the necessary packages.
Re-using the example code
The example code we will use in this article is very similar to the code in the article How to Subscribe to Notifications. We will use a simplified resource structure, but extend it with a subscription for monitoring operations. Please refer to that article for the setup and running the code.
We assume that a local CSE runs on the local machine and can receive requests via http at localhost port 8080.
Monitoring Operations
Resource Structure and Call-Flow
Figure 1 shows the resource structure that we will create in this example. We will create an <AE> resource with a subscription for monitoring operations. The subscription will monitor RETRIEVE operations on the <AE> resource.
CSE
└───CMyApplication
└───mySubscription
The call-flow for this recipe is shown in Figure 2:
The Application entity is depicted as two participants in the call-flow: the Application and the Notification Receiver.
With the first two requests the Application creates the resources and the subscription for monitoring operations. The Notification Receiver receives the notifications from the CSE.
The third request retrieves the <AE> resource and thereby triggers a notification to the Notification Receiver.
The fourth request deletes the <AE> resource and cleans up the resources.
Source Code
This code and the procedures are similar to the code in the article How to Subscribe to Notifications. The main difference is that in this example we create a subscription for monitoring operations with the respective attributes and directly under the <AE> resource.
# Import the setup variablesfromsetupimport*# Import AE functionsfromaeimportregister_AE,unregister_AE,retrieve_AE# Import subscription functionfromsubscriptionimportcreate_subscription# Import notification functionfromnotificationReceiverimportrun_notification_receiver,stop_notification_receiver# Start the notification server firstrun_notification_receiver()# Register an AEifregister_AE(application_name)==False:stop_notification_receiver()exit()# Create a <subscription> resource under the <container> resourceifcreate_subscription(application_name,application_path,subscription_name,notificationURIs)==False:unregister_AE(application_name)stop_notification_receiver()exit()# Retrieve the <container> resourceifretrieve_AE(application_name,application_path)==False:unregister_AE(application_name)stop_notification_receiver()exit()# Unregister the AE and stop the notification serverunregister_AE(application_name)stop_notification_receiver()
importrandom,string# Setup variablescse_url='http://localhost:8080/~/id-in/cse-in'# The url of the CSEnotificationURIs=['http://localhost:7070']# The notification targetapplication_name='CMyApplication'# The name of the application entityapplication_path=cse_url+'/'+application_name# The path of the application entitysubscription_name='mySubscription'# The name of the subscriptiondefrandomID()->str:""" Generate an ID. Prevent certain patterns in the ID. Return: String with a random ID """return''.join(random.choices(string.ascii_uppercase+string.digits+string.ascii_lowercase,k=10))
fromsetupimportcse_url,randomIDimportrequestsdefregister_AE(originator:str)->bool:""" Register an Application Entity Args: originator: The originator of the request Returns: bool: True if the AE was registered successfully, False otherwise """# Set the oneM2M headers for creating the <AE> resourceheaders={'Content-Type':'application/json;ty=2',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <AE> resourcebody={'m2m:ae':{'rn':'CMyApplication','api':'Nmy-application.example.com','rr':True,'srv':['4']}}# Perform the http request to create the <AE> resourceresponse=requests.post(cse_url,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('AE created successfully')else:print('Error creating AE: '+str(response.status_code))returnFalsereturnTrue# Unregister AEdefunregister_AE(application_name:str)->bool:""" Unregister an Application Entity Args: originator: The originator of the request Returns: bool: True if the AE was unregistered successfully, False otherwise """# Set the oneM2M headers for deleting the <AE> resourceheaders={'X-M2M-Origin':application_name,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Perform the http request to delete the <AE> resourceresponse=requests.delete(cse_url+'/'+application_name,headers=headers)# Check the responseifresponse.status_code==200:print('AE deleted successfully')else:print('Error deleting AE: '+str(response.status_code))returnFalsereturnTruedefretrieve_AE(originator:str,path:str)->bool:""" Retrieve an Application Entity Args: originator: The originator of the request path: The path of the <AE> resource """# Set the oneM2M headers for retrieving the <AE> resourceheaders={'Content-Type':'application/json',# Encoding'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}response=requests.get(path,headers=headers)# Check the responseifresponse.status_code==200:print('AE retrieved successfully')else:print('Error creating container: '+str(response.status_code))returnFalsereturnTrue
Functions for working with <subscription> resources
importrequestsfromsetupimportrandomIDdefcreate_subscription(originator:str,path:str,rn:str,notificationURIs:list[str])->bool:""" Create a <subscription> resource Args: originator: The originator of the request path: The path of the parent resource rn: The resource name of the <subscription> resource Returns: bool: True if the <subscription> resource was created successfully, False otherwise """# Set the oneM2M headers for creating the <subscription> resourceheaders={'Content-Type':'application/json;ty=23',# Type of the resource to be created'X-M2M-Origin':originator,# unique application entity identifier'X-M2M-RI':randomID(),# unique request identifier'X-M2M-RVI':'4'}# Define the <subscription> resourcebody={'m2m:sub':{'rn':rn,'enc':{'om':[{# Enable operation monitoring'ops':2,# Monitor RETRIEVE operations'org':originator# Originator of the operation}],},'nct':3,'nu':notificationURIs}}# Perform the http request to create the <subscription> resourceresponse=requests.post(path,headers=headers,json=body)# Check the responseifresponse.status_code==201:print('Subscription created successfully')else:print('Error creating subscription: '+str(response.status_code))returnFalsereturnTrue
fromhttp.serverimportBaseHTTPRequestHandler,HTTPServerfromthreadingimportThreadimportjsonnotification_receiver=NoneclassNotificationReceiver(BaseHTTPRequestHandler):""" The notification handler class. This class handles the HTTP requests sent by the CSE to the notification receiver. """defdo_POST(self):content_length=int(self.headers['Content-Length'])request_id=self.headers['X-M2M-RI']post_data=self.rfile.read(content_length)data=json.loads(post_data)ifdata['m2m:sgn'].get('vrq'):print('<= Verification notification request received')else:print('<= Subscription notification request received')print(f'<= {data}')self.send_response(200)self.send_header('X-M2M-RSC','2000')self.send_header('X-M2M-RI',request_id)self.end_headers()deflog_message(self,format:str,*args:int)->None:# Ignore log messagespassdefrun_notification_receiver(port=7070)->None:""" This function starts the notification receiver on the specified port. The notification receiver will run in a separate thread. Args: handler_class: The HTTP request handler class port: The port on which the notification server will run """globalnotification_receiverserver_address=('',port)notification_receiver=HTTPServer(server_address,NotificationReceiver)print(f'Starting notification receiver on port {port}')Thread(target=notification_receiver.serve_forever).start()defstop_notification_receiver()->None:""" Stop the notification receiver. """globalnotification_receiverifnotification_receiver:notification_receiver.shutdown()notification_receiver=Noneprint('Notification receiver stopped')
Output
The final output when successfully running the script looks like this:
Example Console Output
Starting notification receiver on port 7070
AE created successfully
<= Verification notification request received
<= {'m2m:sgn': {'vrq': True, 'sur': '/id-in/sub977581022253637803', 'cr': 'CMyApplication'}}
Subscription created successfully
AE retrieved successfully
<= Subscription notification request received
<= {'m2m:sgn': {'nev': {'net': 0, 'rep': {'m2m:uri': 'CMyApplication'}, 'om': {'ops': 2, 'org': 'CMyApplication'}}, 'sur': '/id-in/sub977581022253637803'}}
AE deleted successfully
Notification receiver stopped
Summary
In this article we showed how to create a subscription for monitoring operations on resources. We used a Python script to create an <AE> resource with a subscription for monitoring RETRIEVE operations. The script also launches a notification receiver that receives the notifications from the CSE.