From d1af8b7801a19ccfad2bf0c0625c22435673af0b Mon Sep 17 00:00:00 2001 From: Jean Raby Date: Tue, 3 May 2011 12:37:34 +0000 Subject: [PATCH] Imported managesieve.py: implements the client side of the managesieve protocol Added sieve tests (vacation, forwarding and simple sieve scripts) Added some caldav scheduling tests cases and resources planning tests. Added new variables to config.py.in for sieve resources planning tests. Monotone-Parent: d3ab72b923ee1d1bbd604ea5e7e14b54346aa0b3 Monotone-Revision: 53dfb1b04415248644ed626184df1362cd4ee2f1 Monotone-Author: jraby@inverse.ca Monotone-Date: 2011-05-03T12:37:34 Monotone-Branch: ca.inverse.sogo --- Tests/Integration/config.py.in | 6 + Tests/Integration/managesieve.py | 631 ++++++++++++++++++++ Tests/Integration/preferences.py | 35 +- Tests/Integration/test-caldav-scheduling.py | 172 +++++- Tests/Integration/test-sieve.py | 154 +++++ 5 files changed, 973 insertions(+), 25 deletions(-) create mode 100644 Tests/Integration/managesieve.py create mode 100644 Tests/Integration/test-sieve.py diff --git a/Tests/Integration/config.py.in b/Tests/Integration/config.py.in index c3c4a0a35..a70fc0595 100644 --- a/Tests/Integration/config.py.in +++ b/Tests/Integration/config.py.in @@ -12,6 +12,12 @@ subscriber_password = "otherpass" attendee1 = "user@domain.com" attendee1_delegate = "otheruser@domain.com" +resource_no_overbook = "res" +resource_can_overbook = "res-nolimit" + mailserver = "imaphost" testput_nbrdays = 30 + +sieve_server = "localhost" +sieve_port = 2000 diff --git a/Tests/Integration/managesieve.py b/Tests/Integration/managesieve.py new file mode 100644 index 000000000..b248b13c5 --- /dev/null +++ b/Tests/Integration/managesieve.py @@ -0,0 +1,631 @@ +"""Sieve management client. + +A Protocol for Remotely Managing Sieve Scripts +Based on +""" + +__version__ = "0.4.2" +__author__ = """Hartmut Goebel +Ulrich Eck April 2001 +""" + +import binascii, re, socket, time, random, sys +try: + import ssl + ssl_wrap_socket = ssl.wrap_socket +except ImportError: + ssl_wrap_socket = socket.ssl + +__all__ = [ 'MANAGESIEVE', 'SIEVE_PORT', 'OK', 'NO', 'BYE', 'Debug'] + +Debug = 0 +CRLF = '\r\n' +SIEVE_PORT = 2000 + +OK = 'OK' +NO = 'NO' +BYE = 'BYE' + +AUTH_PLAIN = "PLAIN" +AUTH_LOGIN = "LOGIN" +# authentication mechanisms currently supported +# in order of preference +AUTHMECHS = [AUTH_PLAIN, AUTH_LOGIN] + +# todo: return results or raise exceptions? +# todo: on result 'BYE' quit immediatly +# todo: raise exception on 'BYE'? + +# Commands +commands = { + # name valid states + 'STARTTLS': ('NONAUTH',), + 'AUTHENTICATE': ('NONAUTH',), + 'LOGOUT': ('NONAUTH', 'AUTH', 'LOGOUT'), + 'CAPABILITY': ('NONAUTH', 'AUTH'), + 'GETSCRIPT': ('AUTH', ), + 'PUTSCRIPT': ('AUTH', ), + 'SETACTIVE': ('AUTH', ), + 'DELETESCRIPT': ('AUTH', ), + 'LISTSCRIPTS': ('AUTH', ), + 'HAVESPACE': ('AUTH', ), + # bogus command to receive a NO after STARTTLS (see starttls() ) + 'BOGUS': ('NONAUTH', 'AUTH', 'LOGOUT'), + } + +### needed +Oknobye = re.compile(r'(?P(OK|NO|BYE))' + r'( \((?P.*)\))?' + r'( (?P.*))?') +# draft-martin-managesieve-04.txt defines the size tag of literals to +# contain a '+' (plus sign) behind the digits, but timsieved does not +# send one. Thus we are less strikt here: +Literal = re.compile(r'.*{(?P\d+)\+?}$') +re_dquote = re.compile(r'"(([^"\\]|\\.)*)"') +re_esc_quote = re.compile(r'\\([\\"])') + + +class SSLFakeSocket: + """A fake socket object that really wraps a SSLObject. + + It only supports what is needed in managesieve. + """ + def __init__(self, realsock, sslobj): + self.realsock = realsock + self.sslobj = sslobj + + def send(self, str): + self.sslobj.write(str) + return len(str) + + sendall = send + + def close(self): + self.realsock.close() + +class SSLFakeFile: + """A fake file like object that really wraps a SSLObject. + + It only supports what is needed in managesieve. + """ + def __init__(self, sslobj): + self.sslobj = sslobj + + def readline(self): + str = "" + chr = None + while chr != "\n": + chr = self.sslobj.read(1) + str += chr + return str + + def read(self, size=0): + if size == 0: + return '' + else: + return self.sslobj.read(size) + + def close(self): + pass + + +def sieve_name(name): + # todo: correct quoting + return '"%s"' % name + +def sieve_string(string): + return '{%d+}%s%s' % ( len(string), CRLF, string ) + + +class MANAGESIEVE: + """Sieve client class. + + Instantiate with: MANAGESIEVE(host [, port]) + + host - host's name (default: localhost) + port - port number (default: standard Sieve port). + + use_tls - switch to TLS automatically, if server supports + keyfile - keyfile to use for TLS (optional) + certfile - certfile to use for TLS (optional) + + All Sieve commands are supported by methods of the same + name (in lower-case). + + Each command returns a tuple: (type, [data, ...]) where 'type' + is usually 'OK' or 'NO', and 'data' is either the text from the + tagged response, or untagged results from command. + + All arguments to commands are converted to strings, except for + AUTHENTICATE. + """ + + """ + However, the 'password' argument to the LOGIN command is always + quoted. If you want to avoid having an argument string quoted (eg: + the 'flags' argument to STORE) then enclose the string in + parentheses (eg: "(\Deleted)"). + + Errors raise the exception class .error(""). + IMAP4 server errors raise .abort(""), + which is a sub-class of 'error'. Mailbox status changes + from READ-WRITE to READ-ONLY raise the exception class + .readonly(""), which is a sub-class of 'abort'. + + "error" exceptions imply a program error. + "abort" exceptions imply the connection should be reset, and + the command re-tried. + "readonly" exceptions imply the command should be re-tried. + + Note: to use this module, you must read the RFCs pertaining + to the IMAP4 protocol, as the semantics of the arguments to + each IMAP4 command are left to the invoker, not to mention + the results. + """ + + class error(Exception): """Logical errors - debug required""" + class abort(error): """Service errors - close and retry""" + + def __clear_knowledge(self): + """clear/init any knowledge obtained from the server""" + self.capabilities = [] + self.loginmechs = [] + self.implementation = '' + self.supports_tls = 0 + + def __init__(self, host='', port=SIEVE_PORT, + use_tls=False, keyfile=None, certfile=None): + self.host = host + self.port = port + self.debug = Debug + self.state = 'NONAUTH' + + self.response_text = self.response_code = None + self.__clear_knowledge() + + # Open socket to server. + self._open(host, port) + + if __debug__: + self._cmd_log_len = 10 + self._cmd_log_idx = 0 + self._cmd_log = {} # Last `_cmd_log_len' interactions + if self.debug >= 1: + self._mesg('managesieve version %s' % __version__) + + # Get server welcome message, + # request and store CAPABILITY response. + typ, data = self._get_response() + if typ == 'OK': + self._parse_capabilities(data) + if use_tls and self.supports_tls: + typ, data = self.starttls(keyfile=keyfile, certfile=certfile) + if typ == 'OK': + self._parse_capabilities(data) + + + def _parse_capabilities(self, lines): + for line in lines: + if len(line) == 2: + typ, data = line + else: + assert len(line) == 1, 'Bad Capabilities line: %r' % line + typ = line[0] + data = None + if __debug__: + if self.debug >= 3: + self._mesg('%s: %r' % (typ, data)) + if typ == "IMPLEMENTATION": + self.implementation = data + elif typ == "SASL": + self.loginmechs = data.split() + elif typ == "SIEVE": + self.capabilities = data.split() + elif typ == "STARTTLS": + self.supports_tls = 1 + else: + # A client implementation MUST ignore any other + # capabilities given that it does not understand. + pass + return + + + def __getattr__(self, attr): + # Allow UPPERCASE variants of MANAGESIEVE command methods. + if commands.has_key(attr): + return getattr(self, attr.lower()) + raise AttributeError("Unknown MANAGESIEVE command: '%s'" % attr) + + + #### Private methods ### + def _open(self, host, port): + """Setup 'self.sock' and 'self.file'.""" + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + self.file = self.sock.makefile('r') + + def _close(self): + self.file.close() + self.sock.close() + + def _read(self, size): + """Read 'size' bytes from remote.""" + data = "" + while len(data) < size: + data += self.file.read(size - len(data)) + return data + + def _readline(self): + """Read line from remote.""" + return self.file.readline() + + def _send(self, data): + return self.sock.send(data) + + def _get_line(self): + line = self._readline() + if not line: + raise self.abort('socket error: EOF') + # Protocol mandates all lines terminated by CRLF + line = line[:-2] + if __debug__: + if self.debug >= 4: + self._mesg('< %s' % line) + else: + self._log('< %s' % line) + return line + + def _simple_command(self, *args): + """Execute a command which does only return status. + + Returns (typ) with + typ = response type + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + return self._command(*args)[0] # only return typ, ignore data + + + def _command(self, name, arg1=None, arg2=None, *options): + """ + Returns (typ, data) with + typ = response type + data = list of lists of strings read (only meaningfull if OK) + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + if self.state not in commands[name]: + raise self.error( + 'Command %s illegal in state %s' % (name, self.state)) + # concatinate command and arguments (if any) + data = " ".join(filter(None, (name, arg1, arg2))) + if __debug__: + if self.debug >= 4: self._mesg('> %r' % data) + else: self._log('> %s' % data) + try: + try: + self._send('%s%s' % (data, CRLF)) + for o in options: + if __debug__: + if self.debug >= 4: self._mesg('> %r' % o) + else: self._log('> %r' % data) + self._send('%s%s' % (o, CRLF)) + except (socket.error, OSError), val: + raise self.abort('socket error: %s' % val) + return self._get_response() + except self.abort, val: + if __debug__: + if self.debug >= 1: + self.print_log() + raise + + + def _readstring(self, data): + if data[0] == ' ': # space -> error + raise self.error('Unexpected space: %r' % data) + elif data[0] == '"': # handle double quote: + if not self._match(re_dquote, data): + raise self.error('Unmatched quote: %r' % data) + snippet = self.mo.group(1) + return re_esc_quote.sub(r'\1', snippet), data[self.mo.end():] + elif self._match(Literal, data): + # read a 'literal' string + size = int(self.mo.group('size')) + if __debug__: + if self.debug >= 4: + self._mesg('read literal size %s' % size) + return self._read(size), self._get_line() + else: + data = data.split(' ', 1) + if len(data) == 1: + data.append('') + return data + + def _get_response(self): + """ + Returns (typ, data) with + typ = response type + data = list of lists of strings read (only meaningfull if OK) + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + + """ + response-deletescript = response-oknobye + response-authenticate = *(string CRLF) (response-oknobye) + response-capability = *(string [SP string] CRLF) response-oknobye + response-listscripts = *(string [SP "ACTIVE"] CRLF) response-oknobye + response-oknobye = ("OK" / "NO" / "BYE") [SP "(" resp-code ")"] [SP string] CRLF + string = quoted / literal + quoted = <"> *QUOTED-CHAR <"> + literal = "{" number "+}" CRLF *OCTET + ;; The number represents the number of octets + ;; MUST be literal-utf8 except for values + +--> a response either starts with a quote-charakter, a left-bracket or + OK, NO, BYE + +"quoted" CRLF +"quoted" SP "quoted" CRLF +{size} CRLF *OCTETS CRLF +{size} CRLF *OCTETS CRLF +[A-Z-]+ CRLF + + """ + data = [] ; dat = None + resp = self._get_line() + while 1: + if self._match(Oknobye, resp): + typ, code, dat = self.mo.group('type','code','data') + if __debug__: + if self.debug >= 1: + self._mesg('%s response: %s %s' % (typ, code, dat)) + self.response_code = code + self.response_text = None + if dat: + self.response_text = self._readstring(dat)[0] + + # if server quits here, send code instead of empty data + if typ == "BYE": + return typ, code + + return typ, data +## elif 0: +## dat2 = None +## dat, resp = self._readstring(resp) +## if resp.startswith(' '): +## dat2, resp = self._readstring(resp[1:]) +## data.append( (dat, dat2)) +## resp = self._get_line() + else: + dat = [] + while 1: + dat1, resp = self._readstring(resp) + if __debug__: + if self.debug >= 4: + self._mesg('read: %r' % (dat1,)) + if self.debug >= 5: + self._mesg('rest: %r' % (resp,)) + dat.append(dat1) + if not resp.startswith(' '): + break + resp = resp[1:] + if len(dat) == 1: + dat.append(None) + data.append(dat) + resp = self._get_line() + return self.error('Should not come here') + + + def _match(self, cre, s): + # Run compiled regular expression match method on 's'. + # Save result, return success. + self.mo = cre.match(s) + if __debug__: + if self.mo is not None and self.debug >= 5: + self._mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`)) + return self.mo is not None + + + if __debug__: + + def _mesg(self, s, secs=None): + if secs is None: + secs = time.time() + tm = time.strftime('%M:%S', time.localtime(secs)) + sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) + sys.stderr.flush() + + def _log(self, line): + # Keep log of last `_cmd_log_len' interactions for debugging. + self._cmd_log[self._cmd_log_idx] = (line, time.time()) + self._cmd_log_idx += 1 + if self._cmd_log_idx >= self._cmd_log_len: + self._cmd_log_idx = 0 + + def print_log(self): + self.self._mesg('last %d SIEVE interactions:' % len(self._cmd_log)) + i, n = self._cmd_log_idx, self._cmd_log_len + while n: + try: + self.self._mesg(*self._cmd_log[i]) + except: + pass + i += 1 + if i >= self._cmd_log_len: + i = 0 + n -= 1 + + ### Public methods ### + def authenticate(self, mechanism, *authobjects): + """Authenticate command - requires response processing.""" + # command-authenticate = "AUTHENTICATE" SP auth-type [SP string] *(CRLF string) + # response-authenticate = *(string CRLF) (response-oknobye) + mech = mechanism.upper() + if not mech in self.loginmechs: + raise self.error("Server doesn't allow %s authentication." % mech) + + if mech == AUTH_LOGIN: + authobjects = [ sieve_name(binascii.b2a_base64(ao)[:-1]) + for ao in authobjects + ] + elif mech == AUTH_PLAIN: + if len(authobjects) < 3: + # assume authorization identity (authzid) is missing + # and these two authobjects are username and password + authobjects.insert(0, '') + ao = '\0'.join(authobjects) + ao = binascii.b2a_base64(ao)[:-1] + authobjects = [ sieve_string(ao) ] + else: + raise self.error("managesieve doesn't support %s authentication." % mech) + + typ, data = self._command('AUTHENTICATE', + sieve_name(mech), *authobjects) + if typ == 'OK': + self.state = 'AUTH' + return typ + + + def login(self, auth, user, password): + """ + Authenticate to the Sieve server using the best mechanism available. + """ + for authmech in AUTHMECHS: + if authmech in self.loginmechs: + authobjs = [auth, user, password] + if authmech == AUTH_LOGIN: + authobjs = [user, password] + return self.authenticate(authmech, *authobjs) + else: + raise self.abort('No matching authentication mechanism found.') + + def logout(self): + """Terminate connection to server.""" + # command-logout = "LOGOUT" CRLF + # response-logout = response-oknobye + typ = self._simple_command('LOGOUT') + self.state = 'LOGOUT' + self._close() + return typ + + + def listscripts(self): + """Get a list of scripts on the server. + + (typ, [data]) = .listscripts() + + if 'typ' is 'OK', 'data' is list of (scriptname, active) tuples. + """ + # command-listscripts = "LISTSCRIPTS" CRLF + # response-listscripts = *(sieve-name [SP "ACTIVE"] CRLF) response-oknobye + typ, data = self._command('LISTSCRIPTS') + if typ != 'OK': return typ, data + scripts = [] + for dat in data: + if __debug__: + if not len(dat) in (1, 2): + self.error("Unexpected result from LISTSCRIPTS: %r" (dat,)) + scripts.append( (dat[0], dat[1] is not None )) + return typ, scripts + + + def getscript(self, scriptname): + """Get a script from the server. + + (typ, scriptdata) = .getscript(scriptname) + + 'scriptdata' is the script data. + """ + # command-getscript = "GETSCRIPT" SP sieve-name CRLF + # response-getscript = [string CRLF] response-oknobye + + typ, data = self._command('GETSCRIPT', sieve_name(scriptname)) + if typ != 'OK': return typ, data + if len(data) != 1: + self.error('GETSCRIPT returned more than one string/script') + # todo: decode data? + return typ, data[0][0] + + + def putscript(self, scriptname, scriptdata): + """Put a script onto the server.""" + # command-putscript = "PUTSCRIPT" SP sieve-name SP string CRLF + # response-putscript = response-oknobye + return self._simple_command('PUTSCRIPT', + sieve_name(scriptname), + sieve_string(scriptdata) + ) + + def deletescript(self, scriptname): + """Delete a scripts at the server.""" + # command-deletescript = "DELETESCRIPT" SP sieve-name CRLF + # response-deletescript = response-oknobye + return self._simple_command('DELETESCRIPT', sieve_name(scriptname)) + + + def setactive(self, scriptname): + """Mark a script as the 'active' one.""" + # command-setactive = "SETACTIVE" SP sieve-name CRLF + # response-setactive = response-oknobye + return self._simple_command('SETACTIVE', sieve_name(scriptname)) + + + def havespace(self, scriptname, size): + # command-havespace = "HAVESPACE" SP sieve-name SP number CRLF + # response-havespace = response-oknobye + return self._simple_command('HAVESPACE', + sieve_name(scriptname), + str(size)) + + + def capability(self): + """ + Isse a CAPABILITY command and return the result. + + As a side-effect, on succes these attributes are (re)set: + self.implementation + self.loginmechs + self.capabilities + self.supports_tls + """ + # command-capability = "CAPABILITY" CRLF + # response-capability = *(string [SP string] CRLF) response-oknobye + typ, data = self._command('CAPABILITY') + if typ == 'OK': + self._parse_capabilities(data) + return typ, data + + + def starttls(self, keyfile=None, certfile=None): + """Puts the connection to the SIEVE server into TLS mode. + + If the server supports TLS, this will encrypt the rest of the SIEVE + session. If you provide the keyfile and certfile parameters, + the identity of the SIEVE server and client can be checked. This, + however, depends on whether the socket module really checks the + certificates. + """ + # command-starttls = "STARTTLS" CRLF + # response-starttls = response-oknobye + typ, data = self._command('STARTTLS') + if typ == 'OK': + sslobj = ssl_wrap_socket(self.sock, keyfile, certfile) + self.sock = SSLFakeSocket(self.sock, sslobj) + self.file = SSLFakeFile(sslobj) + # MUST discard knowledge obtained from the server + self.__clear_knowledge() + # Some servers send capabilities after TLS handshake, some + # do not. We send a bogus command, and expect a NO. If you + # get something else instead, read the extra NO to clear + # the buffer. + typ, data = self._command('BOGUS') + if typ != 'NO': + typ, data = self._get_response() + # server may not advertise capabilities, thus we need to ask + self.capability() + if self.debug >= 3: self._mesg('started Transport Layer Security (TLS)') + return typ, data diff --git a/Tests/Integration/preferences.py b/Tests/Integration/preferences.py index aa64875b9..52786408f 100644 --- a/Tests/Integration/preferences.py +++ b/Tests/Integration/preferences.py @@ -10,6 +10,7 @@ SOGoSupportedLanguages = [ "Catalan", "Czech", "Welsh", "English", "Spanish", "French", "German", "Italian", "Hungarian", "Dutch", "BrazilianPortuguese", "Norwegian", "Polish", "Russian", "Ukrainian", "Swedish" ] +daysBetweenResponseList=[1,2,3,5,7,14,21,30] class HTTPPreferencesPOST (webdavlib.HTTPPOST): cookie = None @@ -43,12 +44,35 @@ class preferences: authCookie = sogoLogin.getAuthCookie(hostname, port, username, password) self.cookie = authCookie - self.preferencesMap = {"SOGoLanguage": "2.1.0.3.0.1.4.3.1.3.1.1.2"} - # Duplicated from SOGoDefaults.plist + self.preferencesMap = { + "SOGoLanguage": "2.1.0.3.0.1.4.3.1.3.1.1.2", + "SOGoSieveFilters": "sieveFilters", + + # Vacation stuff + "Vacation": "enableVacation", # to disable, don't specify it + "autoReplyText": "autoReplyText", # string + "autoReplyEmailAddresses": "autoReplyEmailAddresses", # LIST + "daysBetweenResponse": "2.1.0.3.0.1.4.3.1.3.7.1.5.1.1.3.7.2", # see daysBetweenResponseList + "ignoreLists": "ignoreLists", #bool + + # forward stuff + "Forward": "enableForward", # to disable, don't specify it + "forwardAddress": "forwardAddress", + "keepCopy": "forwardKeepCopy", + } + + def set(self, preference, value=None): + # if preference is a dict, set all prefs found in the dict + content="" + try: + for k,v in preference.items(): + content+="%s=%s&" % (self.preferencesMap[k], v) + except AttributeError: + # preference wasn't a dict + formKey = self.preferencesMap[preference] + content = "%s=%s&hasChanged=1" % (formKey, value) + - def set(self, preference, value): - formKey = self.preferencesMap[preference] - content = "%s=%s&hasChanged=1" % (formKey, value) url = "/SOGo/so/%s/preferences" % self.login post = HTTPPreferencesPOST (url, content) @@ -79,3 +103,4 @@ class preferences: if __name__ == "__main__": p = preferences () p.set ("SOGoLanguage", SOGoSupportedLanguages.index("French")) + print p.get ("SOGoLanguage") diff --git a/Tests/Integration/test-caldav-scheduling.py b/Tests/Integration/test-caldav-scheduling.py index 4588770f0..a9fb92249 100755 --- a/Tests/Integration/test-caldav-scheduling.py +++ b/Tests/Integration/test-caldav-scheduling.py @@ -3,12 +3,15 @@ # setup: username must be super-user or have read-access to PUBLIC events in # both attendee and delegate's personal calendar -from config import hostname, port, username, password, attendee1, attendee1_delegate +from config import hostname, port, username, password, \ + attendee1, attendee1_delegate, \ + resource_no_overbook, resource_can_overbook import datetime import sogotests import sys import time +import pytz import unittest import utilities import vobject @@ -103,30 +106,13 @@ class CalDAVITIPDelegationTest(unittest.TestCase): (self.user_name, self.user_email) = utility.fetchUserInfo(username) (self.attendee1_name, self.attendee1_email) = utility.fetchUserInfo(attendee1) (self.attendee1_delegate_name, self.attendee1_delegate_email) = utility.fetchUserInfo(attendee1_delegate) + (self.res_no_ob_name, self.res_no_ob_email) = utility.fetchUserInfo(resource_no_overbook) + (self.res_can_ob_name, self.res_can_ob_email) = utility.fetchUserInfo(resource_can_overbook) self.user_calendar = "/SOGo/dav/%s/Calendar/personal/" % username self.attendee1_calendar = "/SOGo/dav/%s/Calendar/personal/" % attendee1 self.attendee1_delegate_calendar = "/SOGo/dav/%s/Calendar/personal/" % attendee1_delegate - def _newEvent(self): - newCal = vobject.iCalendar() - vevent = newCal.add('vevent') - vevent.add('summary').value = "test event" - vevent.add('transp').value = "OPAQUE" - - now = datetime.datetime.now() - startdate = vevent.add('dtstart') - startdate.value = now - enddate = vevent.add('dtend') - enddate.value = now + datetime.timedelta(0, 3600) - vevent.add('uid').value = "test-delegation" - vevent.add('dtstamp').value = now - vevent.add('last-modified').value = now - vevent.add('created').value = now - - vevent.add('sequence').value = "0" - - return newCal def tearDown(self): self._deleteEvent(self.client, @@ -136,6 +122,40 @@ class CalDAVITIPDelegationTest(unittest.TestCase): self._deleteEvent(self.client, "%stest-delegation.ics" % self.attendee1_delegate_calendar, None) + self._deleteEvent(self.client, + "%stest-add-attendee.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-add-attendee.ics" % self.attendee1_calendar, None) + self._deleteEvent(self.client, + "%stest-no-overbook.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-no-overbook-overlap.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-can-overbook.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-can-overbook-overlap.ics" % self.user_calendar, None) + + def _newEvent(self, summary="test event", uid="test", transp=0): + transparency = ("OPAQUE", "TRANSPARENT") + + newCal = vobject.iCalendar() + vevent = newCal.add('vevent') + vevent.add('summary').value = summary + vevent.add('transp').value = transparency[transp] + + now = datetime.datetime.now(pytz.timezone("America/Montreal")) + startdate = vevent.add('dtstart') + startdate.value = now + enddate = vevent.add('dtend') + enddate.value = now + datetime.timedelta(0, 3600) + vevent.add('uid').value = uid + vevent.add('dtstamp').value = now + vevent.add('last-modified').value = now + vevent.add('created').value = now + + vevent.add('sequence').value = "0" + + return newCal def _putEvent(self, client, filename, event, exp_status = 201): put = webdavlib.HTTPPUT(filename, event.serialize()) @@ -218,6 +238,118 @@ class CalDAVITIPDelegationTest(unittest.TestCase): % (email, compared_attendees[email], attendees[email])) + def testAddAttendee(self): + """ add attendee after event creation """ + + # make sure the event doesn't exist + ics_name = "test-add-attendee.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + self._deleteEvent(self.client, + "%s%s" % (self.attendee1_calendar,ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test add attendee", uid="Test add attendee") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. add an attendee + event.add("method").value = "REQUEST" + attendee = event.vevent.add('attendee') + attendee.cn_param = self.attendee1_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.attendee1_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event, + exp_status=204) + + + # 3. verify that the attendee has the event + attendee_event = self._getEvent(self.client, "%s%s" % (self.attendee1_calendar, ics_name)) + + # 4. make sure the received event match the original one + # XXX is this enough? + self.assertEquals(event.vevent.uid, attendee_event.vevent.uid) + + def testResourceNoOverbook(self): + """ try to overbook a resource """ + + # make sure the event doesn't exist + ics_name = "test-no-overbook.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + ob_ics_name = "test-no-overbook-overlap.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test no overbook", uid="test no overbook") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_no_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_no_ob_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. create a second event overlapping the first one + event = self._newEvent(summary="Test no overbook - overlap", uid="test no overbook - overlap") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_no_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_no_ob_email + + # put the event - should trigger a 403 + self._putEvent(self.client, "%s%s" % (self.user_calendar, ob_ics_name), event, exp_status=403) + + def testResourceCanOverbook(self): + """ try to overbook a resource - multiplebookings=0""" + + # make sure the event doesn't exist + ics_name = "test-can-overbook.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + ob_ics_name = "test-can-overbook-overlap.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ob_ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test can overbook", uid="test can overbook") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_can_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_can_ob_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. create a second event overlapping the first one + event = self._newEvent(summary="Test can overbook - overlap", uid="test can overbook - overlap") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_can_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_can_ob_email + + # put the event - should be fine since we can overbook this one + self._putEvent(self.client, "%s%s" % (self.user_calendar, ob_ics_name), event) + + def testInvitationDelegation(self): """ invitation delegation """ diff --git a/Tests/Integration/test-sieve.py b/Tests/Integration/test-sieve.py new file mode 100644 index 000000000..e9594c0da --- /dev/null +++ b/Tests/Integration/test-sieve.py @@ -0,0 +1,154 @@ +#!/usr/bin/python +from config import hostname, port, username, password, sieve_port, sieve_server + +import managesieve +import preferences +import sogotests +import unittest +import utilities +import webdavlib + +sieve_simple_vacation="""require ["vacation"];\r\nvacation :days %(days)d :addresses ["%(mailaddr)s"] text:\r\n%(vacation_msg)s\r\n.\r\n;\r\n""" +sieve_vacation_ignoreLists="""require ["vacation"];\r\nif allof ( not exists ["list-help", "list-unsubscribe", "list-subscribe", "list-owner", "list-post", "list-archive", "list-id", "Mailing-List"], not header :comparator "i;ascii-casemap" :is "Precedence" ["list", "bulk", "junk"], not header :comparator "i;ascii-casemap" :matches "To" "Multiple recipients of*" ) {vacation :days %(days)d :addresses ["%(mailaddr)s"] text:\r\n%(vacation_msg)s\r\n.\r\n;\r\n}\r\n""" +sieve_simple_forward="""redirect "%(redirect_mailaddr)s";\r\n""" +sieve_forward_keep="""redirect "%(redirect_mailaddr)s";\r\nkeep;\r\n""" +sieve_simple_filter="""require ["fileinto"];\r\nif anyof (header :contains "subject" "%(subject)s") {\r\n fileinto "%(folderName)s";\r\n}\r\n""" + +class sieveTest(unittest.TestCase): + def _killFilters(self): + filtersKill={} + # kill existing filters + filtersKill["SOGoSieveFilters"]= "[]" + # vacation filters + filtersKill["autoReplyText"] = "" + filtersKill["autoReplyEmailAddresses"] = "" + # forwarding filters + filtersKill["forwardAddress"] = "" + + self.prefs=preferences.preferences() + self.prefs.set(filtersKill) + + def setUp(self): + ret = "" + + self.client = webdavlib.WebDAVClient(hostname, port, + username, password) + utility = utilities.TestUtility(self, self.client) + (self.user_name, self.user_email) = utility.fetchUserInfo(username) + self.user_email = self.user_email.replace("mailto:", "") + + self.ms = managesieve.MANAGESIEVE(sieve_server, sieve_port) + self.assertEqual(self.ms.login("", username, password), "OK", + "Couldn't login") + + self._killFilters() + + def tearDown(self): + self._killFilters() + + def _getSogoSieveScript(self): + sieveFoundsogo=0 + createSieveScript="" + (ret, sieveScriptList) = self.ms.listscripts() + self.assertEqual(ret, "OK", "Couldn't get sieve script list") + + for (script, isActive) in sieveScriptList: + if (script == "sogo"): + sieveFoundsogo=1 + self.assertEqual(isActive, True, "sogo sieve script is not active!") + (ret, createdSieveScript) = self.ms.getscript(script) + self.assertEqual(ret, "OK", "Couldn't get sogo sieve script") + + self.assertEqual(sieveFoundsogo, 1, "sogo sieve script not found!") + + return createdSieveScript + + def testSieveSimpleVacation(self): + """ enable simple vacation script """ + vacation_msg="vacation test" + daysSelect=4 + + sieveScript = sieve_simple_vacation % { "mailaddr": self.user_email, + "vacation_msg": vacation_msg, + "days": preferences.daysBetweenResponseList[daysSelect], + } + + filterAdd = {"Vacation":1, + "autoReplyText": vacation_msg, + "daysBetweenResponse": daysSelect, + "autoReplyEmailAddresses": self.user_email, + } + + self.prefs.set(filterAdd) + createdSieveScript=self._getSogoSieveScript() + + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveVacationIgnoreLists(self): + """ enable vacation script - ignore lists""" + vacation_msg="vacation test - ignore list" + daysSelect=4 + + sieveScript = sieve_vacation_ignoreLists % { "mailaddr": self.user_email, + "vacation_msg": vacation_msg, + "days": preferences.daysBetweenResponseList[daysSelect], + } + + filterAdd = {"Vacation":1, + "autoReplyText": vacation_msg, + "daysBetweenResponse": daysSelect, + "autoReplyEmailAddresses": self.user_email, + "ignoreLists": 1, + } + + self.prefs.set(filterAdd) + createdSieveScript=self._getSogoSieveScript() + + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveSimpleForward(self): + """ enable simple forwarding """ + redirect_mailaddr="nonexistent@inverse.com" + + sieveScript = sieve_simple_forward % { "redirect_mailaddr": redirect_mailaddr } + + filterAdd = { "Forward": 1, + "forwardAddress": redirect_mailaddr, + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveForwardKeepCopy(self): + """ enable email forwarding - keep a copy """ + redirect_mailaddr="nonexistent@inverse.com" + + sieveScript = sieve_forward_keep % { "redirect_mailaddr": redirect_mailaddr } + + filterAdd = { "Forward": 1, + "forwardAddress": redirect_mailaddr, + "keepCopy": 1, + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveSimpleFilter(self): + """ add simple sieve filter """ + folderName="Sent" + subject=__name__ + + sieveScript=sieve_simple_filter % { "subject": subject, "folderName": folderName } + + filterAdd = { "SOGoSieveFilters": """[{"active": true, "actions": [{"method": "fileinto", "argument": "Sent"}], "rules": [{"operator": "contains", "field": "subject", "value": "%s"}], "match": "any", "name": "%s"}]""" % (subject, folderName) + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + +if __name__ == "__main__": + sogotests.runTests()