#!/usr/bin/env python3 import sys import json import binascii import base64 from email.message import EmailMessage from subprocess import Popen, PIPE import click import pymysql import pymysql.cursors import configparser class GlobalState(): def __init__(self, outputformat): self.outputformat = outputformat self.config = self._load_config() self.db = self._get_connection() self._check_schema() def _get_connection(self): return pymysql.connect(host=self.config['db']['host'], user=self.config['db']['username'], password=self.config['db']['password'], database=self.config['db']['database'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) def _check_schema(self): with self.db.cursor() as cursor: cursor.execute('SELECT name from SequelizeMeta ORDER BY name ASC') schema = ','.join([i['name'] for i in cursor.fetchall()]) if schema != self.config['db']['schema']: click.echo("Unsupportet db schema: {}".format(schema)) sys.exit(2) def _load_config(self): config = configparser.ConfigParser() config.read('/usr/local/etc/hedgedoc-util/hedgedoc-util.cfg') return config def _default_template_mail(): ctx = click.get_current_context().obj return ctx.config['templates']['mail'] def _date_json_handler(obj): return obj.isoformat() if hasattr(obj, 'isoformat') else obj def _decode_nested_json(data, fieldnames): for i in data: for fieldname in fieldnames: if fieldname in i: if i[fieldname] == None: continue i[fieldname] = json.loads(i[fieldname]) return data # See https://github.com/hedgedoc/hedgedoc/blob/2b8aac289a326468ab4eecb442acd59298541399/lib/models/note.js#L165-L181 # Whoever came up with this shit should be hit repeatedly with a blunt object until they expire. At the very least you own me a beer. def note_id_decode_from_url(input_id): raw = binascii.hexlify(base64.urlsafe_b64decode(input_id+'==')).decode() return raw[0:8]+'-'+raw[8:12]+'-'+raw[12:16]+'-'+raw[16:20]+'-'+raw[20:32] def note_id_encode_to_url(input_id): return base64.urlsafe_b64encode(binascii.unhexlify(input_id.replace('-', '').encode())).decode().replace('=', '') def pad_list(db, columns, last_change_older=0, owner=0): with db.cursor() as cursor: # this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist cursor.execute('SELECT {} FROM Notes WHERE (%s = 0 OR DATEDIFF(NOW(),lastchangeAt) > %s) AND (%s = "" or ownerId = %s) ORDER BY id'.format(','.join(columns)), (last_change_older, last_change_older, owner, owner)) return _decode_nested_json(cursor.fetchall(), ['authorship']) def pad_get(db, id): with db.cursor() as cursor: cursor.execute('SELECT * FROM Notes WHERE id=%s', (id)) return _decode_nested_json([cursor.fetchone()], ['authorship'])[0] def pad_get_content(db, id): return pad_get(db, id).get('content', '') def pad_delete(db, id): pad = pad_get(db, id) urlid = note_id_encode_to_url(id) with db: with db.cursor() as cursor: cursor.execute('DELETE FROM Revisions WHERE noteId=%s', (id)) cursor.execute('DELETE FROM Notes WHERE id=%s', (id)) cursor.execute('SELECT id,history FROM Users WHERE JSON_SEARCH(history, "one", %s, "", "$[*].id") is not null;', (urlid)) with db.cursor() as usercursor: for i in cursor: history = json.loads(i['history'] or '[]') history = [ j for j in history if not j.get('id') == urlid ] usercursor.execute('UPDATE Users set history=%s WHERE id=%s;', (json.dumps(history), i['id'])) db.commit() def pad_mail(db, id, template, formats): with db.cursor() as cursor: pad = pad_get(db, id) converted_formats = {} if 'dokuwiki' in formats: converter = Popen(["pandoc", "-o", "/dev/stdout", "-s", "-f", "markdown", "-t", "dokuwiki", "-i", "/dev/stdin"], stdin=PIPE, stdout=PIPE) out, err = converter.communicate(pad.get('content', '').encode()) converted_formats['dokuwiki'] = out.decode() if 'markdown' in formats: converted_formats['markdown'] = pad.get('content', '') msg = EmailMessage() msg["From"] = "hedgedoc" msg["To"] = user_get_mail(db, pad["ownerId"]) msg["Subject"] = "Your pad with title \"{}\"".format(pad.get("title", "<no title>")) msg.set_content(template.format(content_dokuwiki=converted_formats.get('dokuwiki'), content_markdown=converted_formats.get('markdown'))) if 'dokuwiki' in formats: msg.add_attachment(converted_formats['dokuwiki'].encode(), maintype="text", subtype="plain", filename="pad-{}.dokuwiki.txt".format(pad.get("title"))) if 'markdown' in formats: msg.add_attachment(converted_formats['markdown'].encode(), maintype="text", subtype="markdown", filename="pad-{}.md".format(pad.get("title"))) p = Popen(["sendmail", "-t", "-oi"], stdin=PIPE) p.communicate(msg.as_bytes()) def user_list(db, columns): with db.cursor() as cursor: # this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist cursor.execute('SELECT {} FROM Users ORDER BY id'.format(','.join(columns))) return _decode_nested_json(cursor.fetchall(), ['profile', 'history']) def user_get(db, id): with db.cursor() as cursor: cursor.execute('SELECT * FROM Users WHERE id=%s', (id)) return _decode_nested_json([cursor.fetchone()], ['profile', 'history'])[0] def user_get_mail(db, id): user = user_get(db, id) if user['email']: return user['email'] if user['profile']: if user['profile']['email']: return user['profile']['email'] if user['profile']['mail']: return user['profile']['mail'] return None def tsv_escape(value): if not type(value) is str: value = json.dumps(value, default=_date_json_handler) return value.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r', '\\r') def output_object(obj): ctx = click.get_current_context().obj if ctx.outputformat == "text": click.echo(obj) elif ctx.outputformat == "json": click.echo(json.dumps(obj, default=_date_json_handler)) elif ctx.outputformat in ("tsv", "tsv-noheader"): if not obj: return if type(obj) is dict: obj = [obj] if not type(obj) is list: click.echo(obj) header = obj[0].keys() if not ctx.outputformat == "tsv-noheader": click.echo('\t'.join(header)) for i in obj: click.echo('\t'.join([ tsv_escape(i.get(j,'')) for j in header ])) @click.group() @click.option('-o', '--output', type=click.Choice(['text', 'json', 'tsv', 'tsv-noheader']), default='text', help='Select output format', show_default=True) @click.pass_context def cli(ctx, output): ctx.obj = GlobalState(output) @cli.command(name="test-connect", help="Checks wether the connection to the db works and if we support the used schema") @click.pass_obj def _test_connect(obj): if obj.db and obj.config: click.echo("OK") sys.exit(0) else: click.echo("Connection failed") sys.exit(1) @cli.group(name="pad", help="Actions regarding pads") def cli_pad(): pass @cli_pad.command(name="list", help="List all pads") @click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'title', 'ownerId', 'createdAt', 'updatedAt', 'shortid', 'permission', 'viewcount', 'lastchangeuserId', 'lastchangeAt', 'alias', 'deletedAt', 'authorship']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True) @click.option('--last-change-older', type=click.INT, default=0, help='Only list those pads which are older than this value. In days.') @click.option('--owner', type=click.STRING, default='', help='Only list pads with this owner, pass the user id') @click.pass_obj def _pad_list(obj, columns, last_change_older, owner): output_object(pad_list(obj.db, columns, last_change_older=last_change_older, owner=owner)) @cli_pad.command(name="get", help="Get all data of one pad by its id") @click.argument('id') @click.pass_obj def _pad_get(obj, id): output_object(pad_get(obj.db, id)) @cli_pad.command(name="delete", help="Deletes a pad") @click.argument('id') @click.pass_obj def _pad_delete(obj, id): pad_delete(obj.db, id) @cli_pad.command(name="get-content", help="Get the content of one pad by its id") @click.argument('id') @click.pass_obj def _pad_get_content(obj, id): output_object(pad_get_content(obj.db, id)) @cli_pad.command(name="mail", help="Send a pad to its creator via mail") @click.option('--template', type=click.File(), default=_default_template_mail, help='The template file to use', show_default=True) @click.option('--convert', default=['markdown'], type=click.Choice(['markdown', 'dokuwiki']), help="Add the pad as attachment to the mail. Can be passed multiple times.", multiple=True, show_default=True) @click.argument('id') @click.pass_obj def _pad_mail(obj, id, template, convert): pad_mail(obj.db, id, template.read(), formats=convert) @cli_pad.command(name="urlid-to-id", help="Decode an pad id from urls to a database id") @click.argument('id') def _urlid_to_id(id): output_object(note_id_decode_from_url(id)) @cli_pad.command(name="id-to-urlid", help="Encode an pad id to a from used in urls") @click.argument('id') def _id_to_urlid(id): output_object(note_id_encode_to_url(id)) @cli.group(name="user", help="Actions regarding users") def cli_user(): pass @cli_user.command(name="list", help="List all user") @click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'profileid', 'profile', 'history', 'createdAt', 'updatedAt', 'accessToken', 'refreshToken', 'email', 'password', 'deleteToken']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True) @click.pass_obj def _user_list(obj, columns): output_object(user_list(obj.db, columns)) @cli_user.command(name="get", help="Get all data of a user by its id") @click.argument('id') @click.pass_obj def _user_get(obj, id): output_object(user_get(obj.db, id)) @cli_user.command(name="get-mail", help="Find the mail adress of a user, will search in multiple fields") @click.argument('id') @click.pass_obj def _user_get_mail(obj, id): output_object(user_get_mail(obj.db, id)) if __name__ == '__main__': cli()