diff --git a/Tests/Integration/config.py.in b/Tests/Integration/config.py.in index c3c4a0a35..a70fc0595 100644 --- a/Tests/Integration/config.py.in +++ b/Tests/Integration/config.py.in @@ -12,6 +12,12 @@ subscriber_password = "otherpass" attendee1 = "user@domain.com" attendee1_delegate = "otheruser@domain.com" +resource_no_overbook = "res" +resource_can_overbook = "res-nolimit" + mailserver = "imaphost" testput_nbrdays = 30 + +sieve_server = "localhost" +sieve_port = 2000 diff --git a/Tests/Integration/managesieve.py b/Tests/Integration/managesieve.py new file mode 100644 index 000000000..b248b13c5 --- /dev/null +++ b/Tests/Integration/managesieve.py @@ -0,0 +1,631 @@ +"""Sieve management client. + +A Protocol for Remotely Managing Sieve Scripts +Based on +""" + +__version__ = "0.4.2" +__author__ = """Hartmut Goebel +Ulrich Eck April 2001 +""" + +import binascii, re, socket, time, random, sys +try: + import ssl + ssl_wrap_socket = ssl.wrap_socket +except ImportError: + ssl_wrap_socket = socket.ssl + +__all__ = [ 'MANAGESIEVE', 'SIEVE_PORT', 'OK', 'NO', 'BYE', 'Debug'] + +Debug = 0 +CRLF = '\r\n' +SIEVE_PORT = 2000 + +OK = 'OK' +NO = 'NO' +BYE = 'BYE' + +AUTH_PLAIN = "PLAIN" +AUTH_LOGIN = "LOGIN" +# authentication mechanisms currently supported +# in order of preference +AUTHMECHS = [AUTH_PLAIN, AUTH_LOGIN] + +# todo: return results or raise exceptions? +# todo: on result 'BYE' quit immediatly +# todo: raise exception on 'BYE'? + +# Commands +commands = { + # name valid states + 'STARTTLS': ('NONAUTH',), + 'AUTHENTICATE': ('NONAUTH',), + 'LOGOUT': ('NONAUTH', 'AUTH', 'LOGOUT'), + 'CAPABILITY': ('NONAUTH', 'AUTH'), + 'GETSCRIPT': ('AUTH', ), + 'PUTSCRIPT': ('AUTH', ), + 'SETACTIVE': ('AUTH', ), + 'DELETESCRIPT': ('AUTH', ), + 'LISTSCRIPTS': ('AUTH', ), + 'HAVESPACE': ('AUTH', ), + # bogus command to receive a NO after STARTTLS (see starttls() ) + 'BOGUS': ('NONAUTH', 'AUTH', 'LOGOUT'), + } + +### needed +Oknobye = re.compile(r'(?P(OK|NO|BYE))' + r'( \((?P.*)\))?' + r'( (?P.*))?') +# draft-martin-managesieve-04.txt defines the size tag of literals to +# contain a '+' (plus sign) behind the digits, but timsieved does not +# send one. Thus we are less strikt here: +Literal = re.compile(r'.*{(?P\d+)\+?}$') +re_dquote = re.compile(r'"(([^"\\]|\\.)*)"') +re_esc_quote = re.compile(r'\\([\\"])') + + +class SSLFakeSocket: + """A fake socket object that really wraps a SSLObject. + + It only supports what is needed in managesieve. + """ + def __init__(self, realsock, sslobj): + self.realsock = realsock + self.sslobj = sslobj + + def send(self, str): + self.sslobj.write(str) + return len(str) + + sendall = send + + def close(self): + self.realsock.close() + +class SSLFakeFile: + """A fake file like object that really wraps a SSLObject. + + It only supports what is needed in managesieve. + """ + def __init__(self, sslobj): + self.sslobj = sslobj + + def readline(self): + str = "" + chr = None + while chr != "\n": + chr = self.sslobj.read(1) + str += chr + return str + + def read(self, size=0): + if size == 0: + return '' + else: + return self.sslobj.read(size) + + def close(self): + pass + + +def sieve_name(name): + # todo: correct quoting + return '"%s"' % name + +def sieve_string(string): + return '{%d+}%s%s' % ( len(string), CRLF, string ) + + +class MANAGESIEVE: + """Sieve client class. + + Instantiate with: MANAGESIEVE(host [, port]) + + host - host's name (default: localhost) + port - port number (default: standard Sieve port). + + use_tls - switch to TLS automatically, if server supports + keyfile - keyfile to use for TLS (optional) + certfile - certfile to use for TLS (optional) + + All Sieve commands are supported by methods of the same + name (in lower-case). + + Each command returns a tuple: (type, [data, ...]) where 'type' + is usually 'OK' or 'NO', and 'data' is either the text from the + tagged response, or untagged results from command. + + All arguments to commands are converted to strings, except for + AUTHENTICATE. + """ + + """ + However, the 'password' argument to the LOGIN command is always + quoted. If you want to avoid having an argument string quoted (eg: + the 'flags' argument to STORE) then enclose the string in + parentheses (eg: "(\Deleted)"). + + Errors raise the exception class .error(""). + IMAP4 server errors raise .abort(""), + which is a sub-class of 'error'. Mailbox status changes + from READ-WRITE to READ-ONLY raise the exception class + .readonly(""), which is a sub-class of 'abort'. + + "error" exceptions imply a program error. + "abort" exceptions imply the connection should be reset, and + the command re-tried. + "readonly" exceptions imply the command should be re-tried. + + Note: to use this module, you must read the RFCs pertaining + to the IMAP4 protocol, as the semantics of the arguments to + each IMAP4 command are left to the invoker, not to mention + the results. + """ + + class error(Exception): """Logical errors - debug required""" + class abort(error): """Service errors - close and retry""" + + def __clear_knowledge(self): + """clear/init any knowledge obtained from the server""" + self.capabilities = [] + self.loginmechs = [] + self.implementation = '' + self.supports_tls = 0 + + def __init__(self, host='', port=SIEVE_PORT, + use_tls=False, keyfile=None, certfile=None): + self.host = host + self.port = port + self.debug = Debug + self.state = 'NONAUTH' + + self.response_text = self.response_code = None + self.__clear_knowledge() + + # Open socket to server. + self._open(host, port) + + if __debug__: + self._cmd_log_len = 10 + self._cmd_log_idx = 0 + self._cmd_log = {} # Last `_cmd_log_len' interactions + if self.debug >= 1: + self._mesg('managesieve version %s' % __version__) + + # Get server welcome message, + # request and store CAPABILITY response. + typ, data = self._get_response() + if typ == 'OK': + self._parse_capabilities(data) + if use_tls and self.supports_tls: + typ, data = self.starttls(keyfile=keyfile, certfile=certfile) + if typ == 'OK': + self._parse_capabilities(data) + + + def _parse_capabilities(self, lines): + for line in lines: + if len(line) == 2: + typ, data = line + else: + assert len(line) == 1, 'Bad Capabilities line: %r' % line + typ = line[0] + data = None + if __debug__: + if self.debug >= 3: + self._mesg('%s: %r' % (typ, data)) + if typ == "IMPLEMENTATION": + self.implementation = data + elif typ == "SASL": + self.loginmechs = data.split() + elif typ == "SIEVE": + self.capabilities = data.split() + elif typ == "STARTTLS": + self.supports_tls = 1 + else: + # A client implementation MUST ignore any other + # capabilities given that it does not understand. + pass + return + + + def __getattr__(self, attr): + # Allow UPPERCASE variants of MANAGESIEVE command methods. + if commands.has_key(attr): + return getattr(self, attr.lower()) + raise AttributeError("Unknown MANAGESIEVE command: '%s'" % attr) + + + #### Private methods ### + def _open(self, host, port): + """Setup 'self.sock' and 'self.file'.""" + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + self.file = self.sock.makefile('r') + + def _close(self): + self.file.close() + self.sock.close() + + def _read(self, size): + """Read 'size' bytes from remote.""" + data = "" + while len(data) < size: + data += self.file.read(size - len(data)) + return data + + def _readline(self): + """Read line from remote.""" + return self.file.readline() + + def _send(self, data): + return self.sock.send(data) + + def _get_line(self): + line = self._readline() + if not line: + raise self.abort('socket error: EOF') + # Protocol mandates all lines terminated by CRLF + line = line[:-2] + if __debug__: + if self.debug >= 4: + self._mesg('< %s' % line) + else: + self._log('< %s' % line) + return line + + def _simple_command(self, *args): + """Execute a command which does only return status. + + Returns (typ) with + typ = response type + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + return self._command(*args)[0] # only return typ, ignore data + + + def _command(self, name, arg1=None, arg2=None, *options): + """ + Returns (typ, data) with + typ = response type + data = list of lists of strings read (only meaningfull if OK) + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + if self.state not in commands[name]: + raise self.error( + 'Command %s illegal in state %s' % (name, self.state)) + # concatinate command and arguments (if any) + data = " ".join(filter(None, (name, arg1, arg2))) + if __debug__: + if self.debug >= 4: self._mesg('> %r' % data) + else: self._log('> %s' % data) + try: + try: + self._send('%s%s' % (data, CRLF)) + for o in options: + if __debug__: + if self.debug >= 4: self._mesg('> %r' % o) + else: self._log('> %r' % data) + self._send('%s%s' % (o, CRLF)) + except (socket.error, OSError), val: + raise self.abort('socket error: %s' % val) + return self._get_response() + except self.abort, val: + if __debug__: + if self.debug >= 1: + self.print_log() + raise + + + def _readstring(self, data): + if data[0] == ' ': # space -> error + raise self.error('Unexpected space: %r' % data) + elif data[0] == '"': # handle double quote: + if not self._match(re_dquote, data): + raise self.error('Unmatched quote: %r' % data) + snippet = self.mo.group(1) + return re_esc_quote.sub(r'\1', snippet), data[self.mo.end():] + elif self._match(Literal, data): + # read a 'literal' string + size = int(self.mo.group('size')) + if __debug__: + if self.debug >= 4: + self._mesg('read literal size %s' % size) + return self._read(size), self._get_line() + else: + data = data.split(' ', 1) + if len(data) == 1: + data.append('') + return data + + def _get_response(self): + """ + Returns (typ, data) with + typ = response type + data = list of lists of strings read (only meaningfull if OK) + + The responce code and text may be found in .response_code + and .response_text, respectivly. + """ + + """ + response-deletescript = response-oknobye + response-authenticate = *(string CRLF) (response-oknobye) + response-capability = *(string [SP string] CRLF) response-oknobye + response-listscripts = *(string [SP "ACTIVE"] CRLF) response-oknobye + response-oknobye = ("OK" / "NO" / "BYE") [SP "(" resp-code ")"] [SP string] CRLF + string = quoted / literal + quoted = <"> *QUOTED-CHAR <"> + literal = "{" number "+}" CRLF *OCTET + ;; The number represents the number of octets + ;; MUST be literal-utf8 except for values + +--> a response either starts with a quote-charakter, a left-bracket or + OK, NO, BYE + +"quoted" CRLF +"quoted" SP "quoted" CRLF +{size} CRLF *OCTETS CRLF +{size} CRLF *OCTETS CRLF +[A-Z-]+ CRLF + + """ + data = [] ; dat = None + resp = self._get_line() + while 1: + if self._match(Oknobye, resp): + typ, code, dat = self.mo.group('type','code','data') + if __debug__: + if self.debug >= 1: + self._mesg('%s response: %s %s' % (typ, code, dat)) + self.response_code = code + self.response_text = None + if dat: + self.response_text = self._readstring(dat)[0] + + # if server quits here, send code instead of empty data + if typ == "BYE": + return typ, code + + return typ, data +## elif 0: +## dat2 = None +## dat, resp = self._readstring(resp) +## if resp.startswith(' '): +## dat2, resp = self._readstring(resp[1:]) +## data.append( (dat, dat2)) +## resp = self._get_line() + else: + dat = [] + while 1: + dat1, resp = self._readstring(resp) + if __debug__: + if self.debug >= 4: + self._mesg('read: %r' % (dat1,)) + if self.debug >= 5: + self._mesg('rest: %r' % (resp,)) + dat.append(dat1) + if not resp.startswith(' '): + break + resp = resp[1:] + if len(dat) == 1: + dat.append(None) + data.append(dat) + resp = self._get_line() + return self.error('Should not come here') + + + def _match(self, cre, s): + # Run compiled regular expression match method on 's'. + # Save result, return success. + self.mo = cre.match(s) + if __debug__: + if self.mo is not None and self.debug >= 5: + self._mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`)) + return self.mo is not None + + + if __debug__: + + def _mesg(self, s, secs=None): + if secs is None: + secs = time.time() + tm = time.strftime('%M:%S', time.localtime(secs)) + sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s)) + sys.stderr.flush() + + def _log(self, line): + # Keep log of last `_cmd_log_len' interactions for debugging. + self._cmd_log[self._cmd_log_idx] = (line, time.time()) + self._cmd_log_idx += 1 + if self._cmd_log_idx >= self._cmd_log_len: + self._cmd_log_idx = 0 + + def print_log(self): + self.self._mesg('last %d SIEVE interactions:' % len(self._cmd_log)) + i, n = self._cmd_log_idx, self._cmd_log_len + while n: + try: + self.self._mesg(*self._cmd_log[i]) + except: + pass + i += 1 + if i >= self._cmd_log_len: + i = 0 + n -= 1 + + ### Public methods ### + def authenticate(self, mechanism, *authobjects): + """Authenticate command - requires response processing.""" + # command-authenticate = "AUTHENTICATE" SP auth-type [SP string] *(CRLF string) + # response-authenticate = *(string CRLF) (response-oknobye) + mech = mechanism.upper() + if not mech in self.loginmechs: + raise self.error("Server doesn't allow %s authentication." % mech) + + if mech == AUTH_LOGIN: + authobjects = [ sieve_name(binascii.b2a_base64(ao)[:-1]) + for ao in authobjects + ] + elif mech == AUTH_PLAIN: + if len(authobjects) < 3: + # assume authorization identity (authzid) is missing + # and these two authobjects are username and password + authobjects.insert(0, '') + ao = '\0'.join(authobjects) + ao = binascii.b2a_base64(ao)[:-1] + authobjects = [ sieve_string(ao) ] + else: + raise self.error("managesieve doesn't support %s authentication." % mech) + + typ, data = self._command('AUTHENTICATE', + sieve_name(mech), *authobjects) + if typ == 'OK': + self.state = 'AUTH' + return typ + + + def login(self, auth, user, password): + """ + Authenticate to the Sieve server using the best mechanism available. + """ + for authmech in AUTHMECHS: + if authmech in self.loginmechs: + authobjs = [auth, user, password] + if authmech == AUTH_LOGIN: + authobjs = [user, password] + return self.authenticate(authmech, *authobjs) + else: + raise self.abort('No matching authentication mechanism found.') + + def logout(self): + """Terminate connection to server.""" + # command-logout = "LOGOUT" CRLF + # response-logout = response-oknobye + typ = self._simple_command('LOGOUT') + self.state = 'LOGOUT' + self._close() + return typ + + + def listscripts(self): + """Get a list of scripts on the server. + + (typ, [data]) = .listscripts() + + if 'typ' is 'OK', 'data' is list of (scriptname, active) tuples. + """ + # command-listscripts = "LISTSCRIPTS" CRLF + # response-listscripts = *(sieve-name [SP "ACTIVE"] CRLF) response-oknobye + typ, data = self._command('LISTSCRIPTS') + if typ != 'OK': return typ, data + scripts = [] + for dat in data: + if __debug__: + if not len(dat) in (1, 2): + self.error("Unexpected result from LISTSCRIPTS: %r" (dat,)) + scripts.append( (dat[0], dat[1] is not None )) + return typ, scripts + + + def getscript(self, scriptname): + """Get a script from the server. + + (typ, scriptdata) = .getscript(scriptname) + + 'scriptdata' is the script data. + """ + # command-getscript = "GETSCRIPT" SP sieve-name CRLF + # response-getscript = [string CRLF] response-oknobye + + typ, data = self._command('GETSCRIPT', sieve_name(scriptname)) + if typ != 'OK': return typ, data + if len(data) != 1: + self.error('GETSCRIPT returned more than one string/script') + # todo: decode data? + return typ, data[0][0] + + + def putscript(self, scriptname, scriptdata): + """Put a script onto the server.""" + # command-putscript = "PUTSCRIPT" SP sieve-name SP string CRLF + # response-putscript = response-oknobye + return self._simple_command('PUTSCRIPT', + sieve_name(scriptname), + sieve_string(scriptdata) + ) + + def deletescript(self, scriptname): + """Delete a scripts at the server.""" + # command-deletescript = "DELETESCRIPT" SP sieve-name CRLF + # response-deletescript = response-oknobye + return self._simple_command('DELETESCRIPT', sieve_name(scriptname)) + + + def setactive(self, scriptname): + """Mark a script as the 'active' one.""" + # command-setactive = "SETACTIVE" SP sieve-name CRLF + # response-setactive = response-oknobye + return self._simple_command('SETACTIVE', sieve_name(scriptname)) + + + def havespace(self, scriptname, size): + # command-havespace = "HAVESPACE" SP sieve-name SP number CRLF + # response-havespace = response-oknobye + return self._simple_command('HAVESPACE', + sieve_name(scriptname), + str(size)) + + + def capability(self): + """ + Isse a CAPABILITY command and return the result. + + As a side-effect, on succes these attributes are (re)set: + self.implementation + self.loginmechs + self.capabilities + self.supports_tls + """ + # command-capability = "CAPABILITY" CRLF + # response-capability = *(string [SP string] CRLF) response-oknobye + typ, data = self._command('CAPABILITY') + if typ == 'OK': + self._parse_capabilities(data) + return typ, data + + + def starttls(self, keyfile=None, certfile=None): + """Puts the connection to the SIEVE server into TLS mode. + + If the server supports TLS, this will encrypt the rest of the SIEVE + session. If you provide the keyfile and certfile parameters, + the identity of the SIEVE server and client can be checked. This, + however, depends on whether the socket module really checks the + certificates. + """ + # command-starttls = "STARTTLS" CRLF + # response-starttls = response-oknobye + typ, data = self._command('STARTTLS') + if typ == 'OK': + sslobj = ssl_wrap_socket(self.sock, keyfile, certfile) + self.sock = SSLFakeSocket(self.sock, sslobj) + self.file = SSLFakeFile(sslobj) + # MUST discard knowledge obtained from the server + self.__clear_knowledge() + # Some servers send capabilities after TLS handshake, some + # do not. We send a bogus command, and expect a NO. If you + # get something else instead, read the extra NO to clear + # the buffer. + typ, data = self._command('BOGUS') + if typ != 'NO': + typ, data = self._get_response() + # server may not advertise capabilities, thus we need to ask + self.capability() + if self.debug >= 3: self._mesg('started Transport Layer Security (TLS)') + return typ, data diff --git a/Tests/Integration/preferences.py b/Tests/Integration/preferences.py index aa64875b9..52786408f 100644 --- a/Tests/Integration/preferences.py +++ b/Tests/Integration/preferences.py @@ -10,6 +10,7 @@ SOGoSupportedLanguages = [ "Catalan", "Czech", "Welsh", "English", "Spanish", "French", "German", "Italian", "Hungarian", "Dutch", "BrazilianPortuguese", "Norwegian", "Polish", "Russian", "Ukrainian", "Swedish" ] +daysBetweenResponseList=[1,2,3,5,7,14,21,30] class HTTPPreferencesPOST (webdavlib.HTTPPOST): cookie = None @@ -43,12 +44,35 @@ class preferences: authCookie = sogoLogin.getAuthCookie(hostname, port, username, password) self.cookie = authCookie - self.preferencesMap = {"SOGoLanguage": "2.1.0.3.0.1.4.3.1.3.1.1.2"} - # Duplicated from SOGoDefaults.plist + self.preferencesMap = { + "SOGoLanguage": "2.1.0.3.0.1.4.3.1.3.1.1.2", + "SOGoSieveFilters": "sieveFilters", + + # Vacation stuff + "Vacation": "enableVacation", # to disable, don't specify it + "autoReplyText": "autoReplyText", # string + "autoReplyEmailAddresses": "autoReplyEmailAddresses", # LIST + "daysBetweenResponse": "2.1.0.3.0.1.4.3.1.3.7.1.5.1.1.3.7.2", # see daysBetweenResponseList + "ignoreLists": "ignoreLists", #bool + + # forward stuff + "Forward": "enableForward", # to disable, don't specify it + "forwardAddress": "forwardAddress", + "keepCopy": "forwardKeepCopy", + } + + def set(self, preference, value=None): + # if preference is a dict, set all prefs found in the dict + content="" + try: + for k,v in preference.items(): + content+="%s=%s&" % (self.preferencesMap[k], v) + except AttributeError: + # preference wasn't a dict + formKey = self.preferencesMap[preference] + content = "%s=%s&hasChanged=1" % (formKey, value) + - def set(self, preference, value): - formKey = self.preferencesMap[preference] - content = "%s=%s&hasChanged=1" % (formKey, value) url = "/SOGo/so/%s/preferences" % self.login post = HTTPPreferencesPOST (url, content) @@ -79,3 +103,4 @@ class preferences: if __name__ == "__main__": p = preferences () p.set ("SOGoLanguage", SOGoSupportedLanguages.index("French")) + print p.get ("SOGoLanguage") diff --git a/Tests/Integration/test-caldav-scheduling.py b/Tests/Integration/test-caldav-scheduling.py index 4588770f0..a9fb92249 100755 --- a/Tests/Integration/test-caldav-scheduling.py +++ b/Tests/Integration/test-caldav-scheduling.py @@ -3,12 +3,15 @@ # setup: username must be super-user or have read-access to PUBLIC events in # both attendee and delegate's personal calendar -from config import hostname, port, username, password, attendee1, attendee1_delegate +from config import hostname, port, username, password, \ + attendee1, attendee1_delegate, \ + resource_no_overbook, resource_can_overbook import datetime import sogotests import sys import time +import pytz import unittest import utilities import vobject @@ -103,30 +106,13 @@ class CalDAVITIPDelegationTest(unittest.TestCase): (self.user_name, self.user_email) = utility.fetchUserInfo(username) (self.attendee1_name, self.attendee1_email) = utility.fetchUserInfo(attendee1) (self.attendee1_delegate_name, self.attendee1_delegate_email) = utility.fetchUserInfo(attendee1_delegate) + (self.res_no_ob_name, self.res_no_ob_email) = utility.fetchUserInfo(resource_no_overbook) + (self.res_can_ob_name, self.res_can_ob_email) = utility.fetchUserInfo(resource_can_overbook) self.user_calendar = "/SOGo/dav/%s/Calendar/personal/" % username self.attendee1_calendar = "/SOGo/dav/%s/Calendar/personal/" % attendee1 self.attendee1_delegate_calendar = "/SOGo/dav/%s/Calendar/personal/" % attendee1_delegate - def _newEvent(self): - newCal = vobject.iCalendar() - vevent = newCal.add('vevent') - vevent.add('summary').value = "test event" - vevent.add('transp').value = "OPAQUE" - - now = datetime.datetime.now() - startdate = vevent.add('dtstart') - startdate.value = now - enddate = vevent.add('dtend') - enddate.value = now + datetime.timedelta(0, 3600) - vevent.add('uid').value = "test-delegation" - vevent.add('dtstamp').value = now - vevent.add('last-modified').value = now - vevent.add('created').value = now - - vevent.add('sequence').value = "0" - - return newCal def tearDown(self): self._deleteEvent(self.client, @@ -136,6 +122,40 @@ class CalDAVITIPDelegationTest(unittest.TestCase): self._deleteEvent(self.client, "%stest-delegation.ics" % self.attendee1_delegate_calendar, None) + self._deleteEvent(self.client, + "%stest-add-attendee.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-add-attendee.ics" % self.attendee1_calendar, None) + self._deleteEvent(self.client, + "%stest-no-overbook.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-no-overbook-overlap.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-can-overbook.ics" % self.user_calendar, None) + self._deleteEvent(self.client, + "%stest-can-overbook-overlap.ics" % self.user_calendar, None) + + def _newEvent(self, summary="test event", uid="test", transp=0): + transparency = ("OPAQUE", "TRANSPARENT") + + newCal = vobject.iCalendar() + vevent = newCal.add('vevent') + vevent.add('summary').value = summary + vevent.add('transp').value = transparency[transp] + + now = datetime.datetime.now(pytz.timezone("America/Montreal")) + startdate = vevent.add('dtstart') + startdate.value = now + enddate = vevent.add('dtend') + enddate.value = now + datetime.timedelta(0, 3600) + vevent.add('uid').value = uid + vevent.add('dtstamp').value = now + vevent.add('last-modified').value = now + vevent.add('created').value = now + + vevent.add('sequence').value = "0" + + return newCal def _putEvent(self, client, filename, event, exp_status = 201): put = webdavlib.HTTPPUT(filename, event.serialize()) @@ -218,6 +238,118 @@ class CalDAVITIPDelegationTest(unittest.TestCase): % (email, compared_attendees[email], attendees[email])) + def testAddAttendee(self): + """ add attendee after event creation """ + + # make sure the event doesn't exist + ics_name = "test-add-attendee.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + self._deleteEvent(self.client, + "%s%s" % (self.attendee1_calendar,ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test add attendee", uid="Test add attendee") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. add an attendee + event.add("method").value = "REQUEST" + attendee = event.vevent.add('attendee') + attendee.cn_param = self.attendee1_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.attendee1_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event, + exp_status=204) + + + # 3. verify that the attendee has the event + attendee_event = self._getEvent(self.client, "%s%s" % (self.attendee1_calendar, ics_name)) + + # 4. make sure the received event match the original one + # XXX is this enough? + self.assertEquals(event.vevent.uid, attendee_event.vevent.uid) + + def testResourceNoOverbook(self): + """ try to overbook a resource """ + + # make sure the event doesn't exist + ics_name = "test-no-overbook.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + ob_ics_name = "test-no-overbook-overlap.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test no overbook", uid="test no overbook") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_no_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_no_ob_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. create a second event overlapping the first one + event = self._newEvent(summary="Test no overbook - overlap", uid="test no overbook - overlap") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_no_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_no_ob_email + + # put the event - should trigger a 403 + self._putEvent(self.client, "%s%s" % (self.user_calendar, ob_ics_name), event, exp_status=403) + + def testResourceCanOverbook(self): + """ try to overbook a resource - multiplebookings=0""" + + # make sure the event doesn't exist + ics_name = "test-can-overbook.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ics_name), None) + + ob_ics_name = "test-can-overbook-overlap.ics" + self._deleteEvent(self.client, + "%s%s" % (self.user_calendar,ob_ics_name), None) + + # 1. create an event in the organiser's calendar + event = self._newEvent(summary="Test can overbook", uid="test can overbook") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_can_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_can_ob_email + self._putEvent(self.client, "%s%s" % (self.user_calendar, ics_name), event) + + # 2. create a second event overlapping the first one + event = self._newEvent(summary="Test can overbook - overlap", uid="test can overbook - overlap") + organizer = event.vevent.add('organizer') + organizer.cn_param = self.user_name + organizer.value = self.user_email + attendee = event.vevent.add('attendee') + attendee.cn_param = self.res_can_ob_name + attendee.rsvp_param = "TRUE" + attendee.partstat_param = "NEEDS-ACTION" + attendee.value = self.res_can_ob_email + + # put the event - should be fine since we can overbook this one + self._putEvent(self.client, "%s%s" % (self.user_calendar, ob_ics_name), event) + + def testInvitationDelegation(self): """ invitation delegation """ diff --git a/Tests/Integration/test-sieve.py b/Tests/Integration/test-sieve.py new file mode 100644 index 000000000..e9594c0da --- /dev/null +++ b/Tests/Integration/test-sieve.py @@ -0,0 +1,154 @@ +#!/usr/bin/python +from config import hostname, port, username, password, sieve_port, sieve_server + +import managesieve +import preferences +import sogotests +import unittest +import utilities +import webdavlib + +sieve_simple_vacation="""require ["vacation"];\r\nvacation :days %(days)d :addresses ["%(mailaddr)s"] text:\r\n%(vacation_msg)s\r\n.\r\n;\r\n""" +sieve_vacation_ignoreLists="""require ["vacation"];\r\nif allof ( not exists ["list-help", "list-unsubscribe", "list-subscribe", "list-owner", "list-post", "list-archive", "list-id", "Mailing-List"], not header :comparator "i;ascii-casemap" :is "Precedence" ["list", "bulk", "junk"], not header :comparator "i;ascii-casemap" :matches "To" "Multiple recipients of*" ) {vacation :days %(days)d :addresses ["%(mailaddr)s"] text:\r\n%(vacation_msg)s\r\n.\r\n;\r\n}\r\n""" +sieve_simple_forward="""redirect "%(redirect_mailaddr)s";\r\n""" +sieve_forward_keep="""redirect "%(redirect_mailaddr)s";\r\nkeep;\r\n""" +sieve_simple_filter="""require ["fileinto"];\r\nif anyof (header :contains "subject" "%(subject)s") {\r\n fileinto "%(folderName)s";\r\n}\r\n""" + +class sieveTest(unittest.TestCase): + def _killFilters(self): + filtersKill={} + # kill existing filters + filtersKill["SOGoSieveFilters"]= "[]" + # vacation filters + filtersKill["autoReplyText"] = "" + filtersKill["autoReplyEmailAddresses"] = "" + # forwarding filters + filtersKill["forwardAddress"] = "" + + self.prefs=preferences.preferences() + self.prefs.set(filtersKill) + + def setUp(self): + ret = "" + + self.client = webdavlib.WebDAVClient(hostname, port, + username, password) + utility = utilities.TestUtility(self, self.client) + (self.user_name, self.user_email) = utility.fetchUserInfo(username) + self.user_email = self.user_email.replace("mailto:", "") + + self.ms = managesieve.MANAGESIEVE(sieve_server, sieve_port) + self.assertEqual(self.ms.login("", username, password), "OK", + "Couldn't login") + + self._killFilters() + + def tearDown(self): + self._killFilters() + + def _getSogoSieveScript(self): + sieveFoundsogo=0 + createSieveScript="" + (ret, sieveScriptList) = self.ms.listscripts() + self.assertEqual(ret, "OK", "Couldn't get sieve script list") + + for (script, isActive) in sieveScriptList: + if (script == "sogo"): + sieveFoundsogo=1 + self.assertEqual(isActive, True, "sogo sieve script is not active!") + (ret, createdSieveScript) = self.ms.getscript(script) + self.assertEqual(ret, "OK", "Couldn't get sogo sieve script") + + self.assertEqual(sieveFoundsogo, 1, "sogo sieve script not found!") + + return createdSieveScript + + def testSieveSimpleVacation(self): + """ enable simple vacation script """ + vacation_msg="vacation test" + daysSelect=4 + + sieveScript = sieve_simple_vacation % { "mailaddr": self.user_email, + "vacation_msg": vacation_msg, + "days": preferences.daysBetweenResponseList[daysSelect], + } + + filterAdd = {"Vacation":1, + "autoReplyText": vacation_msg, + "daysBetweenResponse": daysSelect, + "autoReplyEmailAddresses": self.user_email, + } + + self.prefs.set(filterAdd) + createdSieveScript=self._getSogoSieveScript() + + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveVacationIgnoreLists(self): + """ enable vacation script - ignore lists""" + vacation_msg="vacation test - ignore list" + daysSelect=4 + + sieveScript = sieve_vacation_ignoreLists % { "mailaddr": self.user_email, + "vacation_msg": vacation_msg, + "days": preferences.daysBetweenResponseList[daysSelect], + } + + filterAdd = {"Vacation":1, + "autoReplyText": vacation_msg, + "daysBetweenResponse": daysSelect, + "autoReplyEmailAddresses": self.user_email, + "ignoreLists": 1, + } + + self.prefs.set(filterAdd) + createdSieveScript=self._getSogoSieveScript() + + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveSimpleForward(self): + """ enable simple forwarding """ + redirect_mailaddr="nonexistent@inverse.com" + + sieveScript = sieve_simple_forward % { "redirect_mailaddr": redirect_mailaddr } + + filterAdd = { "Forward": 1, + "forwardAddress": redirect_mailaddr, + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveForwardKeepCopy(self): + """ enable email forwarding - keep a copy """ + redirect_mailaddr="nonexistent@inverse.com" + + sieveScript = sieve_forward_keep % { "redirect_mailaddr": redirect_mailaddr } + + filterAdd = { "Forward": 1, + "forwardAddress": redirect_mailaddr, + "keepCopy": 1, + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + def testSieveSimpleFilter(self): + """ add simple sieve filter """ + folderName="Sent" + subject=__name__ + + sieveScript=sieve_simple_filter % { "subject": subject, "folderName": folderName } + + filterAdd = { "SOGoSieveFilters": """[{"active": true, "actions": [{"method": "fileinto", "argument": "Sent"}], "rules": [{"operator": "contains", "field": "subject", "value": "%s"}], "match": "any", "name": "%s"}]""" % (subject, folderName) + } + + self.prefs.set(filterAdd) + createdSieveScript = self._getSogoSieveScript() + self.assertEqual(sieveScript, createdSieveScript) + + +if __name__ == "__main__": + sogotests.runTests()