I am trying to send messages to the Azure Event Hub using python, and the rest of the API after some unsuccessful experiments, I found working code (see below), but I want to be able to choose which section to send the event to.
Is this possible using the rest API, and if so, how to do it?
import json
from datetime import datetime
from multiprocessing import Pool
from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication
from azure.http import (
HTTPRequest,
HTTPError
)
from azure.http.httpclient import _HTTPClient
EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net"
EVENT_HUB_NAME = "secerthub-name"
KEYNAME = "senderkey"
KEYVALUE = "keyvalue"
EXTRA_HEADERS = []
NUM_OF_PARTITIONS = 16
class EventHubClient(object):
def __init__(self, host, hubname, keyname, keyvalue):
self._host = host
self._hub = hubname
self._keyname = keyname
self._key = keyvalue
def sendMessage(self, body, partition=None, additional_headers=None):
eventHubHost = self._host
httpclient = _HTTPClient(service_instance=self)
sasKeyName = self._keyname
sasKeyValue = self._key
authentication = ServiceBusSASAuthentication(sasKeyName, sasKeyValue)
request = HTTPRequest()
request.method = "POST"
request.host = eventHubHost
request.protocol_override = "https"
request.path = "/%s/messages?api-version=2014-01" % (self._hub)
request.body = body
request.headers.append(('Content-Type', 'application/atom+xml;type=entry;charset=utf-8'))
if additional_headers is not None:
for item in additional_headers:
request.headers.append(item)
if partition is not None:
value = json.dumps({'PartitionKey': partition})
request.headers.append(('BrokerProperties', value))
authentication.sign_request(request, httpclient)
request.headers.append(('Content-Length', str(len(request.body))))
status = 0
try:
resp = httpclient.perform_request(request)
status = resp.status
except HTTPError as ex:
status = ex.status
return status
def prepare_message(appid, sessionid, partitionKey=None, SessionEllapsed=None, DeviceOs=None):
message = {"Name": "MonitorEvent"}
Attributes = {"AppId": appid, "SessionStarted": "".join(str(datetime.now())[:-3])}
if SessionEllapsed is not None:
Attributes['SessionEllapsed'] = SessionEllapsed
if DeviceOs is not None:
Attributes['DeviceOs'] = DeviceOs
if partitionKey is not None:
message["PartitionKey"] = str(partitionKey)
message["PartitionId"] = str(partitionKey)
Attributes['ItemId'] = partitionKey
message['Attributes'] = Attributes
return json.dumps(message)
def send_monitoring_event(partition):
hubClient = EventHubClient(EVENT_HUB_HOST, EVENT_HUB_NAME, KEYNAME, KEYVALUE)
appid = 1
sendertime = datetime.now().strftime('%Y%M%d-%H%M%S')
message = prepare_message(appid, sendertime, partitionKey=partition, SessionEllapsed=1, DeviceOs='Monitor' + str(partition))
hubStatus = hubClient.sendMessage(message, partition=None, additional_headers=EXTRA_HEADERS)
return hubStatus
def main():
pool = Pool(processes=NUM_OF_PARTITIONS)
print pool.map(send_monitoring_event, range(NUM_OF_PARTITIONS))
if __name__ == '__main__':
main()
Srgrn source
share