diff options
author | Yves Fischer <yvesf-git@xapek.org> | 2018-08-13 13:11:47 +0200 |
---|---|---|
committer | Yves Fischer <yvesf-git@xapek.org> | 2018-08-13 13:59:52 +0200 |
commit | a4a1e5be3d19434a418b0358e1bb75d272392816 (patch) | |
tree | 1fb61468ee07f2db0446b27e9f5b38863a98af1b | |
download | influxdb-udp-inserter-a4a1e5be3d19434a418b0358e1bb75d272392816.tar.gz influxdb-udp-inserter-a4a1e5be3d19434a418b0358e1bb75d272392816.zip |
prototype
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | README.md | 9 | ||||
-rw-r--r-- | influxdb_udp_inserter/__init__.py | 148 | ||||
-rw-r--r-- | influxdb_udp_inserter/sample_message_format.js | 18 | ||||
-rw-r--r-- | influxdb_udp_inserter/server_async.py | 74 | ||||
-rwxr-xr-x | server.py | 29 | ||||
-rw-r--r-- | test_client.py | 20 | ||||
-rw-r--r-- | test_serialize.py | 42 |
8 files changed, 342 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..64a7365 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/.idea +__pycache__ diff --git a/README.md b/README.md new file mode 100644 index 0000000..a58fed2 --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +# UDP Wire protocol for influxdb + +Key points: + +- UDP datagramm based +- No ack's, no transport encryption +- No transport compression (there is not a huge gain and lack of support in micropython) +- Messages are authenticated by hash of secret value +- All client-relevant code is compatible with micropython
\ No newline at end of file diff --git a/influxdb_udp_inserter/__init__.py b/influxdb_udp_inserter/__init__.py new file mode 100644 index 0000000..9c51951 --- /dev/null +++ b/influxdb_udp_inserter/__init__.py @@ -0,0 +1,148 @@ +try: + import struct +except ImportError: + import ustruct as struct + +try: + import socket +except ImportError: + import usocket as socket + + +class Struct: + def __init__(self, fmt): + self.fmt = fmt + self.size = struct.calcsize(self.fmt) + + def pack(self, value): + return struct.pack(self.fmt, value) + + def unpack(self, data): + return struct.unpack(self.fmt, data)[0] + + +UINT8 = Struct('B') +UINT16 = Struct('H') +UINT32 = Struct('I') + +try: + import hashlib +except ImportError: + import uhashlib as hashlib + + +class SerializerFactory: + def __init__(self): + self.message_formats = {} + + def add_message_format(self, description): + if not 'identifier' in description: + raise Exception('Missing required option "identifier"') + identifier = description['identifier'] + if not isinstance(identifier, bytes): + identifier = bytes(identifier) + if identifier in self.message_formats: + raise Exception('There is already a message defined with identifier', description['identifier']) + self.message_formats[identifier] = description + + def get_serializer(self, identifier): + if not isinstance(identifier, bytes): + identifier = bytes(identifier) + if identifier in self.message_formats: + return MessageSerializer.from_config(self.message_formats[identifier]) + else: + raise Exception('No message format with identifier', identifier) + + +class MessageSerializer: + @staticmethod + def _make_fields(fields): + for field_name, *values in fields: + v = [] + for name, value in values: + if value.lower() == 'uint16': + v += [(name, UINT16)] + elif value.lower() == 'uint32': + v += [(name, UINT32)] + elif value.lower() == 'uint8': + v += [(name, UINT8)] + else: + raise Exception('Datatype not supported', value) + result = [field_name] + v + yield result + + @staticmethod + def from_config(config): + identifier = bytes(config['identifier']) + secret = bytes(config['secret']) + database = config['database'] + fields = list(MessageSerializer._make_fields(config['fields'])) + return MessageSerializer(identifier, database, secret, fields) + + def __init__(self, identifier, database, secret, fields): + self.identifier = identifier + self.database = database + self.secret = secret + self.fields = fields + self.size = sum(map(lambda f: sum(v[1].size for v in f[1:]), self.fields)) + + def serialize(self, data: dict): + if len(data) != len(self.fields): + raise Exception() + if set(data.keys()) != set(map(lambda v: v[0], self.fields)): + raise Exception("Data does not match schema") + + payload = bytes() + + for config_name, *config_values in self.fields: + data_values = data[config_name] + if set(data_values.keys()) != set(map(lambda kv: kv[0], config_values)): + raise Exception("inconsistent data for ", config_name, data_values.keys(), dict(config_values).keys()) + + for config_sub_name, config_struct in config_values: + payload += config_struct.pack(data_values[config_sub_name]) + + buf = bytes(self.identifier[0:3]) + payload + + hash = hashlib.sha256(buf + self.secret).digest() + buf += hash[0:6] + + return buf + + def deserialize(self, data): + if len(data) < 3 + 6: + raise Exception('Message of wrong size', len(data)) + + hash = hashlib.sha256(data[:-6] + self.secret).digest() + if hash[0:6] != data[-6:]: + raise Exception("Failed to authenticate message") + + payload = data[3:-6] + if len(payload) != self.size: + raise Exception('Message of wrong payload size', len(payload)) + + result = [] + i = 0 + for config_name, *config_values in self.fields: + result_field = {} + + for config_sub_name, config_struct in config_values: + window = payload[i:i + config_struct.size] + result_field[config_sub_name] = config_struct.unpack(window) + i += config_struct.size + + result += [(config_name, result_field)] + + return dict(result) + + +def send(host: str, port: int, serializer: MessageSerializer, data: dict): + serialized_data = serializer.serialize(data) + + sockaddr = socket.getaddrinfo(host, port)[0][-1] + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.connect(sockaddr) + s.send(serialized_data) + finally: + s.close() diff --git a/influxdb_udp_inserter/sample_message_format.js b/influxdb_udp_inserter/sample_message_format.js new file mode 100644 index 0000000..41d37e8 --- /dev/null +++ b/influxdb_udp_inserter/sample_message_format.js @@ -0,0 +1,18 @@ +{ + "identifier": [1, 1, 1], + "secret": [12, 23, 23, 11, 244, 23, 222, 123], + "database": "data", + "fields": [ + ["inverter0.PVVoltage1", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.PVVoltage2", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.PVVoltage3", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.PVCurrent1", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.PVCurrent2", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.PVCurrent3", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.GridVoltage", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.GridFrequency", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.SmoothedInstantEnergyProduction", ["min", "uint16"], ["max", "uint16"], ["last","uint16"]], + ["inverter0.LatestEvent", ["value","uint16"]], + ["inverter0.LatestEventModule", ["value","uint16"]] + ] +}
\ No newline at end of file diff --git a/influxdb_udp_inserter/server_async.py b/influxdb_udp_inserter/server_async.py new file mode 100644 index 0000000..5e2933a --- /dev/null +++ b/influxdb_udp_inserter/server_async.py @@ -0,0 +1,74 @@ +from . import SerializerFactory + +from influxdb import line_protocol + +import logging +import json +import asyncio +import aiohttp + + +class UdpInserterProtocol(asyncio.DatagramProtocol): + def __init__(self, factory: SerializerFactory, influx_url: str): + self.factory = factory + self.influx_url = influx_url + self.transport: asyncio.Transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + logging.info('Received %s bytes: %r(...) from %s', len(data), data[0:3], addr) + if len(data) < 7: return + + serializer = self.factory.get_serializer(data[0:3]) + mesg = serializer.deserialize(data) + + influxdb_points = [] + for key, value in mesg.items(): + influxdb_points.append({ + 'measurement': key, + 'tags': {}, + 'fields': dict(value) + }) + + post_data = line_protocol.make_lines({'points': influxdb_points}).encode() + + asyncio.ensure_future(send(self.influx_url + '?db=' + serializer.database, post_data)) + + +async def send(url, data): + logging.info("Send data: %r", data) + async with aiohttp.ClientSession() as client_session: + async with client_session.post(url, + headers={aiohttp.hdrs.CONTENT_TYPE: 'application/x-www-form-urlencoded'}, + data=data) as http_resp: + body = await http_resp.read() + logging.info("Received response %s (0..200): %r", http_resp.status, body[0:200]) + http_resp.close() + + +def create_endpoint(factory: SerializerFactory, influx_url: str, *args, **kwargs): + loop = asyncio.get_event_loop() + listen = loop.create_datagram_endpoint(lambda: UdpInserterProtocol(factory, influx_url), *args, **kwargs) + return listen + + +class Server: + def __init__(self, influx_url: str, *args, **kwargs): + self.influx_url = influx_url + self.serializer_factory = SerializerFactory() + self.listen = create_endpoint(self.serializer_factory, self.influx_url, *args, **kwargs) + + def run(self): + loop = asyncio.get_event_loop() + transport, protocol = loop.run_until_complete(self.listen) + try: + loop.run_forever() + finally: + transport.close() + loop.close() + + def read_message_format_file(self, path): + config = json.load(open(path)) + self.serializer_factory.add_message_format(config) diff --git a/server.py b/server.py new file mode 100755 index 0000000..454f4a6 --- /dev/null +++ b/server.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +from influxdb_udp_inserter.server_async import Server + +import argparse +import glob +import logging + +def main(): + logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() + parser.add_argument('--url', nargs=1, metavar='URL', help='Influxdb URL for Line Protocol', + required=True) + parser.add_argument('--formats', nargs=1, action='append', metavar='PATTERN', + help='Glob pattern to look for message formats') + parser.add_argument('--port', nargs=1, metavar='PORT', help='List on UDP port number', default=9999) + args = parser.parse_args() + + s = Server(args.url[0], local_addr=('0.0.0.0', args.port)) + + for pattern in args.formats: + for filepath in glob.glob(pattern[0]): + logging.info("Add message format from file: %s", filepath) + s.read_message_format_file(filepath) + + s.run() + + +if __name__ == '__main__': + main() diff --git a/test_client.py b/test_client.py new file mode 100644 index 0000000..a4c416f --- /dev/null +++ b/test_client.py @@ -0,0 +1,20 @@ +from influxdb_udp_inserter import send, MessageSerializer + +try: + import json +except ImportError: + import ujson as json + +with open('influxdb_udp_inserter/sample_message_format.js') as fp: + serializer = MessageSerializer.from_config(json.load(fp)) + + + +fake_data = {} +for field_name, *values in serializer.fields: + fake_data[field_name] = {} + for name, value in values: + fake_data[field_name][name] = sum(map(ord, field_name+name)) % 0xff +send('0.0.0.0', 9999, serializer, fake_data) + +print('done') diff --git a/test_serialize.py b/test_serialize.py new file mode 100644 index 0000000..1f5d5b0 --- /dev/null +++ b/test_serialize.py @@ -0,0 +1,42 @@ +from influxdb_udp_inserter import SerializerFactory + +def test(): + try: + import os + except ImportError: + import uos as os + try: + import json + except ImportError: + import ujson as json + + with open('influxdb_udp_inserter/sample_message_format.js') as fp: + message_format = json.load(fp) + + factory = SerializerFactory() + factory.add_message_format(message_format) + + identifier = bytes((0x01, 0x01, 0x01)) + fake_data = {} + for field_name, *values in factory.get_serializer(identifier).fields: + fake_data[field_name] = {} + for name, value in values: + fake_data[field_name][name] = sum(map(ord, field_name+name)) % 0xff + + print(fake_data) + serialized = factory.get_serializer(identifier).serialize(fake_data) + print(serialized) + + print("{}byte => 360 * 24 * 12 => {}Mb".format(len(serialized), (len(serialized) * 360 * 24 * 12) / 1024 / 1024)) + + result = factory.get_serializer(identifier).deserialize(serialized) + print(result) + + if fake_data != result: + raise Exception() + else: + print("Input/Output verified correctly") + + +if 1==1: + test() |