Skip to content
Snippets Groups Projects
hedgedoc-util.py 9.83 KiB
Newer Older
  • Learn to ignore specific revisions
  • nd's avatar
    nd committed
    #!/usr/bin/env python3
    import sys
    import json
    import binascii
    import base64
    from email.message import EmailMessage
    from subprocess import Popen, PIPE
    
    import click
    import pymysql
    import pymysql.cursors
    import configparser
    
    class GlobalState():
    	def __init__(self, outputformat):
    		self.outputformat = outputformat
    		self.config = self._load_config()
    		self.db = self._get_connection()
    		self._check_schema()
    
    	def _get_connection(self):
    		return pymysql.connect(host=self.config['db']['host'],
    			user=self.config['db']['username'],
    			password=self.config['db']['password'],
    			database=self.config['db']['database'],
    			charset='utf8mb4',
    			cursorclass=pymysql.cursors.DictCursor)
    
    	def _check_schema(self):
    		with self.db.cursor() as cursor:
    			cursor.execute('SELECT name from SequelizeMeta ORDER BY name ASC')
    			schema = ','.join([i['name'] for i in cursor.fetchall()])
    		if schema != self.config['db']['schema']:
    			click.echo("Unsupportet db schema: {}".format(schema))
    			sys.exit(2)
    
    	def _load_config(self):
    		config = configparser.ConfigParser()
    		config.read('/usr/local/etc/hedgedoc-util/hedgedoc-util.cfg')
    		return config
    
    def _default_template_mail():
    	ctx = click.get_current_context().obj
    	return ctx.config['templates']['mail']
    
    def _date_json_handler(obj):
    	return obj.isoformat() if hasattr(obj, 'isoformat') else obj
    
    def _decode_nested_json(data, fieldnames):
    	for i in data:
    		for fieldname in fieldnames:
    			if fieldname in i:
    				if i[fieldname] == None:
    					continue
    				i[fieldname] = json.loads(i[fieldname])
    	return data
    
    # See https://github.com/hedgedoc/hedgedoc/blob/2b8aac289a326468ab4eecb442acd59298541399/lib/models/note.js#L165-L181
    
    nd's avatar
    nd committed
    # Whoever came up with this shit should be hit repeatedly with a blunt object until they expire. At the very least you own me a beer.
    
    nd's avatar
    nd committed
    def note_id_decode_from_url(input_id):
    	raw = binascii.hexlify(base64.urlsafe_b64decode(input_id+'==')).decode()
    	return raw[0:8]+'-'+raw[8:12]+'-'+raw[12:16]+'-'+raw[16:20]+'-'+raw[20:32]
    def note_id_encode_to_url(input_id):
    	return base64.urlsafe_b64encode(binascii.unhexlify(input_id.replace('-', '').encode())).decode().replace('=', '')
    
    def pad_list(db, columns, last_change_older=0, owner=0):
    	with db.cursor() as cursor:
    		# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
    		cursor.execute('SELECT {} FROM Notes WHERE (%s = 0 OR DATEDIFF(NOW(),lastchangeAt) > %s) AND (%s = "" or ownerId = %s) ORDER BY id'.format(','.join(columns)), (last_change_older, last_change_older, owner, owner))
    		return _decode_nested_json(cursor.fetchall(), ['authorship'])
    
    def pad_get(db, id):
    	with db.cursor() as cursor:
    		cursor.execute('SELECT * FROM Notes WHERE id=%s', (id))
    		return _decode_nested_json([cursor.fetchone()], ['authorship'])[0]
    
    def pad_get_content(db, id):
    	return pad_get(db, id).get('content', '')
    
    def pad_delete(db, id):
    	pad = pad_get(db, id)
    	urlid = note_id_encode_to_url(id)
    	with db:
    		with db.cursor() as cursor:
    			cursor.execute('DELETE FROM Revisions WHERE noteId=%s', (id))
    			cursor.execute('DELETE FROM Notes WHERE id=%s', (id))
    			cursor.execute('SELECT id,history FROM Users WHERE JSON_SEARCH(history, "one", %s, "", "$[*].id") is not null;', (urlid))
    			with db.cursor() as usercursor:
    				for i in cursor:
    					history = json.loads(i['history'] or '[]')
    					history = [ j for j in history if not j.get('id') == urlid ]
    					usercursor.execute('UPDATE Users set history=%s WHERE id=%s;', (json.dumps(history), i['id']))
    		db.commit()
    
    def pad_mail(db, id, template, formats):
    	with db.cursor() as cursor:
    		pad = pad_get(db, id)
    
    		converted_formats = {}
    		if 'dokuwiki' in formats:
    			converter = Popen(["pandoc", "-o", "/dev/stdout", "-s", "-f", "markdown", "-t", "dokuwiki", "-i", "/dev/stdin"], stdin=PIPE, stdout=PIPE)
    			out, err = converter.communicate(pad.get('content', '').encode())
    			converted_formats['dokuwiki'] = out.decode()
    		if 'markdown' in formats:
    			converted_formats['markdown'] = pad.get('content', '')
    
    		msg = EmailMessage()
    		msg["From"] = "hedgedoc"
    		msg["To"] = user_get_mail(db, pad["ownerId"])
    		msg["Subject"] = "Your pad with title \"{}\"".format(pad.get("title", "<no title>"))
    		msg.set_content(template.format(content_dokuwiki=converted_formats.get('dokuwiki'), content_markdown=converted_formats.get('markdown')))
    		if 'dokuwiki' in formats:
    			msg.add_attachment(converted_formats['dokuwiki'].encode(), maintype="text", subtype="plain", filename="pad-{}.dokuwiki.txt".format(pad.get("title")))
    		if 'markdown' in formats:
    			msg.add_attachment(converted_formats['markdown'].encode(), maintype="text", subtype="markdown", filename="pad-{}.md".format(pad.get("title")))
    
    		p = Popen(["sendmail", "-t", "-oi"], stdin=PIPE)
    		p.communicate(msg.as_bytes())
    
    def user_list(db, columns):
    	with db.cursor() as cursor:
    		# this is no sql injection vulnerability because we let click verify the content of "columns" to match a whitelist
    		cursor.execute('SELECT {} FROM Users ORDER BY id'.format(','.join(columns)))
    		return _decode_nested_json(cursor.fetchall(), ['profile', 'history'])
    
    def user_get(db, id):
    	with db.cursor() as cursor:
    		cursor.execute('SELECT * FROM Users WHERE id=%s', (id))
    		return _decode_nested_json([cursor.fetchone()], ['profile', 'history'])[0]
    
    def user_get_mail(db, id):
    	user = user_get(db, id)
    	if user['email']:
    		return user['email']
    	if user['profile']:
    		if user['profile']['email']:
    			return user['profile']['email']
    		if user['profile']['mail']:
    			return user['profile']['mail']
    	return None
    
    def tsv_escape(value):
    	if not type(value) is str:
    		value = json.dumps(value, default=_date_json_handler)
    	return value.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r', '\\r')
    
    def output_object(obj):
    	ctx = click.get_current_context().obj
    	if ctx.outputformat == "text":
    		click.echo(obj)
    	elif ctx.outputformat == "json":
    		click.echo(json.dumps(obj, default=_date_json_handler))
    	elif ctx.outputformat in ("tsv", "tsv-noheader"):
    		if type(obj) is dict:
    			obj = [obj]
    		if not type(obj) is list:
    			click.echo(obj)
    		if not obj:
    			return
    		header = obj[0].keys()
    		if not ctx.outputformat == "tsv-noheader":
    			click.echo('\t'.join(header))
    		for i in obj:
    			click.echo('\t'.join([ tsv_escape(i.get(j,'')) for j in header ]))
    
    
    @click.group()
    @click.option('-o', '--output', type=click.Choice(['text', 'json', 'tsv', 'tsv-noheader']), default='text', help='Select output format', show_default=True)
    @click.pass_context
    def cli(ctx, output):
    	ctx.obj = GlobalState(output)
    
    @cli.command(name="test-connect", help="Checks wether the connection to the db works and if we support the used schema")
    @click.pass_obj
    def _test_connect(obj):
    	if obj.db and obj.config:
    		click.echo("OK")
    		sys.exit(0)
    	else:
    		click.echo("Connection failed")
    		sys.exit(1)
    
    @cli.group(name="pad", help="Actions regarding pads")
    def cli_pad():
    	pass
    
    @cli_pad.command(name="list", help="List all pads")
    @click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'title', 'ownerId', 'createdAt', 'updatedAt', 'shortid', 'permission', 'viewcount', 'lastchangeuserId', 'lastchangeAt', 'alias', 'deletedAt', 'authorship']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True)
    @click.option('--last-change-older', type=click.INT, default=0, help='Only list those pads which are older than this value. In days.')
    @click.option('--owner', type=click.STRING, default='', help='Only list pads with this owner, pass the user id')
    @click.pass_obj
    def _pad_list(obj, columns, last_change_older, owner):
    	output_object(pad_list(obj.db, columns, last_change_older=last_change_older, owner=owner))
    
    @cli_pad.command(name="get", help="Get all data of one pad by its id")
    @click.argument('id')
    @click.pass_obj
    def _pad_get(obj, id):
    	output_object(pad_get(obj.db, id))
    
    @cli_pad.command(name="delete", help="Deletes a pad")
    @click.argument('id')
    @click.pass_obj
    def _pad_delete(obj, id):
    	pad_delete(obj.db, id)
    
    @cli_pad.command(name="get-content", help="Get the content of one pad by its id")
    @click.argument('id')
    @click.pass_obj
    def _pad_get_content(obj, id):
    	output_object(pad_get(obj.db, id))
    
    @cli_pad.command(name="mail", help="Send a pad to its creator via mail")
    @click.option('--template', type=click.File(), default=_default_template_mail, help='The template file to use', show_default=True)
    @click.option('--convert', default=['markdown'], type=click.Choice(['markdown', 'dokuwiki']), help="Add the pad as attachment to the mail. Can be passed multiple times.", multiple=True, show_default=True)
    @click.argument('id')
    @click.pass_obj
    def _pad_mail(obj, id, template, convert):
    	pad_mail(obj.db, id, template.read(), formats=convert)
    
    @cli_pad.command(name="urlid-to-id", help="Decode an pad id from urls to a database id")
    @click.argument('id')
    def _urlid_to_id(id):
    	output_object(note_id_decode_from_url(id))
    
    @cli_pad.command(name="id-to-urlid", help="Encode an pad id to a from used in urls")
    @click.argument('id')
    def _id_to_urlid(id):
    	output_object(note_id_encode_to_url(id))
    
    @cli.group(name="user", help="Actions regarding users")
    def cli_user():
    	pass
    
    @cli_user.command(name="list", help="List all user")
    @click.option('-c', '--columns', default=['id'], type=click.Choice(['id', 'profileid', 'profile', 'history', 'createdAt', 'updatedAt', 'accessToken', 'refreshToken', 'email', 'password', 'deleteToken']), help="Select what data to display. Can be passed multiple times.", multiple=True, show_default=True)
    @click.pass_obj
    def _user_list(obj, columns):
    	output_object(user_list(obj.db, columns))
    
    @cli_user.command(name="get", help="Get all data of a user by its id")
    @click.argument('id')
    @click.pass_obj
    def _user_get(obj, id):
    	output_object(user_get(obj.db, id))
    
    @cli_user.command(name="get-mail", help="Find the mail adress of a user, will search in multiple fields")
    @click.argument('id')
    @click.pass_obj
    def _user_get_mail(obj, id):
    	output_object(user_get_mail(obj.db, id))
    
    if __name__ == '__main__':
    	cli()