summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2018-08-13 13:11:47 +0200
committerYves Fischer <yvesf-git@xapek.org>2018-08-13 13:59:52 +0200
commita4a1e5be3d19434a418b0358e1bb75d272392816 (patch)
tree1fb61468ee07f2db0446b27e9f5b38863a98af1b
downloadinfluxdb-udp-inserter-a4a1e5be3d19434a418b0358e1bb75d272392816.tar.gz
influxdb-udp-inserter-a4a1e5be3d19434a418b0358e1bb75d272392816.zip
prototype
-rw-r--r--.gitignore2
-rw-r--r--README.md9
-rw-r--r--influxdb_udp_inserter/__init__.py148
-rw-r--r--influxdb_udp_inserter/sample_message_format.js18
-rw-r--r--influxdb_udp_inserter/server_async.py74
-rwxr-xr-xserver.py29
-rw-r--r--test_client.py20
-rw-r--r--test_serialize.py42
8 files changed, 342 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..64a7365
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+/.idea
+__pycache__
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a58fed2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,9 @@
+# UDP Wire protocol for influxdb
+
+Key points:
+
+- UDP datagramm based
+- No ack's, no transport encryption
+- No transport compression (there is not a huge gain and lack of support in micropython)
+- Messages are authenticated by hash of secret value
+- All client-relevant code is compatible with micropython \ No newline at end of file
diff --git a/influxdb_udp_inserter/__init__.py b/influxdb_udp_inserter/__init__.py
new file mode 100644
index 0000000..9c51951
--- /dev/null
+++ b/influxdb_udp_inserter/__init__.py
@@ -0,0 +1,148 @@
+try:
+ import struct
+except ImportError:
+ import ustruct as struct
+
+try:
+ import socket
+except ImportError:
+ import usocket as socket
+
+
+class Struct:
+ def __init__(self, fmt):
+ self.fmt = fmt
+ self.size = struct.calcsize(self.fmt)
+
+ def pack(self, value):
+ return struct.pack(self.fmt, value)
+
+ def unpack(self, data):
+ return struct.unpack(self.fmt, data)[0]
+
+
+UINT8 = Struct('B')
+UINT16 = Struct('H')
+UINT32 = Struct('I')
+
+try:
+ import hashlib
+except ImportError:
+ import uhashlib as hashlib
+
+
+class SerializerFactory:
+ def __init__(self):
+ self.message_formats = {}
+
+ def add_message_format(self, description):
+ if not 'identifier' in description:
+ raise Exception('Missing required option "identifier"')
+ identifier = description['identifier']
+ if not isinstance(identifier, bytes):
+ identifier = bytes(identifier)
+ if identifier in self.message_formats:
+ raise Exception('There is already a message defined with identifier', description['identifier'])
+ self.message_formats[identifier] = description
+
+ def get_serializer(self, identifier):
+ if not isinstance(identifier, bytes):
+ identifier = bytes(identifier)
+ if identifier in self.message_formats:
+ return MessageSerializer.from_config(self.message_formats[identifier])
+ else:
+ raise Exception('No message format with identifier', identifier)
+
+
+class MessageSerializer:
+ @staticmethod
+ def _make_fields(fields):
+ for field_name, *values in fields:
+ v = []
+ for name, value in values:
+ if value.lower() == 'uint16':
+ v += [(name, UINT16)]
+ elif value.lower() == 'uint32':
+ v += [(name, UINT32)]
+ elif value.lower() == 'uint8':
+ v += [(name, UINT8)]
+ else:
+ raise Exception('Datatype not supported', value)
+ result = [field_name] + v
+ yield result
+
+ @staticmethod
+ def from_config(config):
+ identifier = bytes(config['identifier'])
+ secret = bytes(config['secret'])
+ database = config['database']
+ fields = list(MessageSerializer._make_fields(config['fields']))
+ return MessageSerializer(identifier, database, secret, fields)
+
+ def __init__(self, identifier, database, secret, fields):
+ self.identifier = identifier
+ self.database = database
+ self.secret = secret
+ self.fields = fields
+ self.size = sum(map(lambda f: sum(v[1].size for v in f[1:]), self.fields))
+
+ def serialize(self, data: dict):
+ if len(data) != len(self.fields):
+ raise Exception()
+ if set(data.keys()) != set(map(lambda v: v[0], self.fields)):
+ raise Exception("Data does not match schema")
+
+ payload = bytes()
+
+ for config_name, *config_values in self.fields:
+ data_values = data[config_name]
+ if set(data_values.keys()) != set(map(lambda kv: kv[0], config_values)):
+ raise Exception("inconsistent data for ", config_name, data_values.keys(), dict(config_values).keys())
+
+ for config_sub_name, config_struct in config_values:
+ payload += config_struct.pack(data_values[config_sub_name])
+
+ buf = bytes(self.identifier[0:3]) + payload
+
+ hash = hashlib.sha256(buf + self.secret).digest()
+ buf += hash[0:6]
+
+ return buf
+
+ def deserialize(self, data):
+ if len(data) < 3 + 6:
+ raise Exception('Message of wrong size', len(data))
+
+ hash = hashlib.sha256(data[:-6] + self.secret).digest()
+ if hash[0:6] != data[-6:]:
+ raise Exception("Failed to authenticate message")
+
+ payload = data[3:-6]
+ if len(payload) != self.size:
+ raise Exception('Message of wrong payload size', len(payload))
+
+ result = []
+ i = 0
+ for config_name, *config_values in self.fields:
+ result_field = {}
+
+ for config_sub_name, config_struct in config_values:
+ window = payload[i:i + config_struct.size]
+ result_field[config_sub_name] = config_struct.unpack(window)
+ i += config_struct.size
+
+ result += [(config_name, result_field)]
+
+ return dict(result)
+
+
+def send(host: str, port: int, serializer: MessageSerializer, data: dict):
+ serialized_data = serializer.serialize(data)
+
+ sockaddr = socket.getaddrinfo(host, port)[0][-1]
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ s.connect(sockaddr)
+ s.send(serialized_data)
+ finally:
+ s.close()
diff --git a/influxdb_udp_inserter/sample_message_format.js b/influxdb_udp_inserter/sample_message_format.js
new file mode 100644
index 0000000..41d37e8
--- /dev/null
+++ b/influxdb_udp_inserter/sample_message_format.js
@@ -0,0 +1,18 @@
+{
+ "identifier": [1, 1, 1],
+ "secret": [12, 23, 23, 11, 244, 23, 222, 123],
+ "database": "data",
+ "fields": [
+ ["inverter0.PVVoltage1", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.PVVoltage2", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.PVVoltage3", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.PVCurrent1", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.PVCurrent2", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.PVCurrent3", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.GridVoltage", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.GridFrequency", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.SmoothedInstantEnergyProduction", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]],
+ ["inverter0.LatestEvent", ["value","uint16"]],
+ ["inverter0.LatestEventModule", ["value","uint16"]]
+ ]
+} \ No newline at end of file
diff --git a/influxdb_udp_inserter/server_async.py b/influxdb_udp_inserter/server_async.py
new file mode 100644
index 0000000..5e2933a
--- /dev/null
+++ b/influxdb_udp_inserter/server_async.py
@@ -0,0 +1,74 @@
+from . import SerializerFactory
+
+from influxdb import line_protocol
+
+import logging
+import json
+import asyncio
+import aiohttp
+
+
+class UdpInserterProtocol(asyncio.DatagramProtocol):
+ def __init__(self, factory: SerializerFactory, influx_url: str):
+ self.factory = factory
+ self.influx_url = influx_url
+ self.transport: asyncio.Transport = None
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def datagram_received(self, data, addr):
+ logging.info('Received %s bytes: %r(...) from %s', len(data), data[0:3], addr)
+ if len(data) < 7: return
+
+ serializer = self.factory.get_serializer(data[0:3])
+ mesg = serializer.deserialize(data)
+
+ influxdb_points = []
+ for key, value in mesg.items():
+ influxdb_points.append({
+ 'measurement': key,
+ 'tags': {},
+ 'fields': dict(value)
+ })
+
+ post_data = line_protocol.make_lines({'points': influxdb_points}).encode()
+
+ asyncio.ensure_future(send(self.influx_url + '?db=' + serializer.database, post_data))
+
+
+async def send(url, data):
+ logging.info("Send data: %r", data)
+ async with aiohttp.ClientSession() as client_session:
+ async with client_session.post(url,
+ headers={aiohttp.hdrs.CONTENT_TYPE: 'application/x-www-form-urlencoded'},
+ data=data) as http_resp:
+ body = await http_resp.read()
+ logging.info("Received response %s (0..200): %r", http_resp.status, body[0:200])
+ http_resp.close()
+
+
+def create_endpoint(factory: SerializerFactory, influx_url: str, *args, **kwargs):
+ loop = asyncio.get_event_loop()
+ listen = loop.create_datagram_endpoint(lambda: UdpInserterProtocol(factory, influx_url), *args, **kwargs)
+ return listen
+
+
+class Server:
+ def __init__(self, influx_url: str, *args, **kwargs):
+ self.influx_url = influx_url
+ self.serializer_factory = SerializerFactory()
+ self.listen = create_endpoint(self.serializer_factory, self.influx_url, *args, **kwargs)
+
+ def run(self):
+ loop = asyncio.get_event_loop()
+ transport, protocol = loop.run_until_complete(self.listen)
+ try:
+ loop.run_forever()
+ finally:
+ transport.close()
+ loop.close()
+
+ def read_message_format_file(self, path):
+ config = json.load(open(path))
+ self.serializer_factory.add_message_format(config)
diff --git a/server.py b/server.py
new file mode 100755
index 0000000..454f4a6
--- /dev/null
+++ b/server.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python3
+from influxdb_udp_inserter.server_async import Server
+
+import argparse
+import glob
+import logging
+
+def main():
+ logging.basicConfig(level=logging.INFO)
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--url', nargs=1, metavar='URL', help='Influxdb URL for Line Protocol',
+ required=True)
+ parser.add_argument('--formats', nargs=1, action='append', metavar='PATTERN',
+ help='Glob pattern to look for message formats')
+ parser.add_argument('--port', nargs=1, metavar='PORT', help='List on UDP port number', default=9999)
+ args = parser.parse_args()
+
+ s = Server(args.url[0], local_addr=('0.0.0.0', args.port))
+
+ for pattern in args.formats:
+ for filepath in glob.glob(pattern[0]):
+ logging.info("Add message format from file: %s", filepath)
+ s.read_message_format_file(filepath)
+
+ s.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test_client.py b/test_client.py
new file mode 100644
index 0000000..a4c416f
--- /dev/null
+++ b/test_client.py
@@ -0,0 +1,20 @@
+from influxdb_udp_inserter import send, MessageSerializer
+
+try:
+ import json
+except ImportError:
+ import ujson as json
+
+with open('influxdb_udp_inserter/sample_message_format.js') as fp:
+ serializer = MessageSerializer.from_config(json.load(fp))
+
+
+
+fake_data = {}
+for field_name, *values in serializer.fields:
+ fake_data[field_name] = {}
+ for name, value in values:
+ fake_data[field_name][name] = sum(map(ord, field_name+name)) % 0xff
+send('0.0.0.0', 9999, serializer, fake_data)
+
+print('done')
diff --git a/test_serialize.py b/test_serialize.py
new file mode 100644
index 0000000..1f5d5b0
--- /dev/null
+++ b/test_serialize.py
@@ -0,0 +1,42 @@
+from influxdb_udp_inserter import SerializerFactory
+
+def test():
+ try:
+ import os
+ except ImportError:
+ import uos as os
+ try:
+ import json
+ except ImportError:
+ import ujson as json
+
+ with open('influxdb_udp_inserter/sample_message_format.js') as fp:
+ message_format = json.load(fp)
+
+ factory = SerializerFactory()
+ factory.add_message_format(message_format)
+
+ identifier = bytes((0x01, 0x01, 0x01))
+ fake_data = {}
+ for field_name, *values in factory.get_serializer(identifier).fields:
+ fake_data[field_name] = {}
+ for name, value in values:
+ fake_data[field_name][name] = sum(map(ord, field_name+name)) % 0xff
+
+ print(fake_data)
+ serialized = factory.get_serializer(identifier).serialize(fake_data)
+ print(serialized)
+
+ print("{}byte => 360 * 24 * 12 => {}Mb".format(len(serialized), (len(serialized) * 360 * 24 * 12) / 1024 / 1024))
+
+ result = factory.get_serializer(identifier).deserialize(serialized)
+ print(result)
+
+ if fake_data != result:
+ raise Exception()
+ else:
+ print("Input/Output verified correctly")
+
+
+if 1==1:
+ test()