# -*- mode: python; coding: utf-8 -*- # :Progetto: vcpx -- CVS details # :Creato: mer 16 giu 2004 00:46:12 CEST # :Autore: Lele Gaifax # :Licenza: GNU General Public License # """ This module contains supporting classes for CVS. To get a cross-repository revision number ala Subversion, the implementation uses `cvsps` to fetch the changes from the upstream repository. """ __docformat__ = 'reStructuredText' from shwrap import ExternalCommand, PIPE, STDOUT from source import UpdatableSourceWorkingDir, ChangesetApplicationFailure, \ InvocationError from target import SyncronizableTargetWorkingDir, TargetInitializationFailure class EmptyRepositoriesFoolsMe(Exception): "Cannot handle empty repositories. Maybe wrong module/repository?" # This is the exception raised when we try to tailor an empty CVS # repository. This is more a shortcoming of tailor, rather than a # real problem with those repositories. def changesets_from_cvsps(log, sincerev=None): """ Parse CVSps log. """ from changes import Changeset, ChangesetEntry from datetime import datetime from cvs import compare_cvs_revs # cvsps output sample: ## --------------------- ## PatchSet 1500 ## Date: 2004/05/09 17:54:22 ## Author: grubert ## Branch: HEAD ## Tag: (none) ## Log: ## Tell the reason for using mbox (not wrapping long lines). ## ## Members: ## docutils/writers/latex2e.py:1.78->1.79 l = None while 1: l = log.readline() if l <> '---------------------\n': break l = log.readline() assert l.startswith('PatchSet '), "Parse error: %s"%l pset = {} pset['revision'] = l[9:-1].strip() l = log.readline() while not l.startswith('Log:'): field,value = l.split(':',1) pset[field.lower()] = value.strip() l = log.readline() msg = [] l = log.readline() msg.append(l) l = log.readline() while l <> 'Members: \n': msg.append(l) l = log.readline() pset['log'] = ''.join(msg) assert l.startswith('Members:'), "Parse error: %s" % l pset['entries'] = entries = [] l = log.readline() seen = {} while l.startswith('\t'): if not sincerev or (sincerev') # Due to the fuzzy mechanism, cvsps may group # together two commits on a single entry, thus # giving something like: # # Normalizer.py:1.12->1.13 # Registry.py:1.22->1.23 # Registry.py:1.21->1.22 # Stopwords.py:1.9->1.10 # # Collapse those into a single one. e = seen.get(file) if not e: e = ChangesetEntry(file) e.old_revision = fromrev e.new_revision = torev seen[file] = e entries.append(e) else: if compare_cvs_revs(e.old_revision, fromrev)>0: e.old_revision = fromrev if compare_cvs_revs(e.new_revision, torev)<0: e.new_revision = torev if fromrev=='INITIAL': e.action_kind = e.ADDED elif "(DEAD)" in torev: e.action_kind = e.DELETED e.new_revision = torev[:torev.index('(DEAD)')] else: e.action_kind = e.UPDATED l = log.readline() if not sincerev or (sincerev3: break delay = 2**retry self.log_info("%s returned status %s, " "retrying in %d seconds..." % (str(cvsup), cvsup.exit_status, delay)) sleep(retry) else: break if cvsup.exit_status: raise ChangesetApplicationFailure( "%s returned status %s" % (str(cvsup), cvsup.exit_status)) self.log_info("%s updated to %s" % (e.name, e.new_revision)) if e.action_kind == e.DELETED: self.__maybeDeleteDirectory(split(e.name)[0], changeset) def _checkoutUpstreamRevision(self, revision): """ Concretely do the checkout of the upstream sources. Use `revision` as the name of the tag to get, or as a date if it starts with a number. Return the last applied changeset. """ from os.path import join, exists from cvs import CvsEntries, compare_cvs_revs if not self.repository.module: raise InvocationError("Must specify a module name") timestamp = None if revision is not None: # If the revision contains a space, assume it really # specify a branch and a timestamp. If it starts with # a digit, assume it's a timestamp. Otherwise, it must # be a branch name if revision[0] in '0123456789' or revision == 'INITIAL': timestamp = revision revision = None elif ' ' in revision: revision, timestamp = revision.split(' ', 1) csets = self.getPendingChangesets(revision) if not csets: raise TargetInitializationFailure( "Something went wrong: there are no changesets since " "revision '%s'" % revision) if timestamp == 'INITIAL': cset = csets.next() timestamp = cset.date.isoformat(sep=' ') else: cset = None if not exists(join(self.basedir, 'CVS')): cmd = [self.repository.CVS_CMD, "-q", "-d", self.repository.repository, "checkout", "-d", self.repository.subdir] if revision: cmd.extend(["-r", revision]) if timestamp: cmd.extend(["-D", "%s UTC" % timestamp]) checkout = ExternalCommand(cwd=self.repository.rootdir, command=cmd) checkout.execute(self.repository.module) if checkout.exit_status: raise TargetInitializationFailure( "%s returned status %s" % (str(checkout), checkout.exit_status)) else: self.log_info("Using existing %s" % self.basedir) self.__forceTagOnEachEntry() entries = CvsEntries(self.basedir) youngest_entry = entries.getYoungestEntry() if youngest_entry is None: raise EmptyRepositoriesFoolsMe("The working copy '%s' of the " "CVS repository seems empty, " "don't know how to deal with " "that." % self.basedir) # loop over the changesets and find the last applied, to find # out the actual cvsps revision found = False if cset is None: try: cset = csets.next() except StopIteration: cset = None while cset is not None: for m in cset.entries: info = entries.getFileInfo(m.name) if info: actualversion = info.cvs_version found = compare_cvs_revs(actualversion,m.new_revision) == 0 if not found: break if found: last = cset break try: cset = csets.next() except StopIteration: cset = None if not found: raise TargetInitializationFailure( "Something went wrong: unable to determine the exact upstream " "revision of the checked out tree in '%s'" % self.basedir) else: self.log_info("working copy up to cvs revision %s" % last.revision) return last def _willApplyChangeset(self, changeset, applyable=None): """ This gets called just before applying each changeset. Since CVS has no "createdir" event, we have to take care of new directories, creating empty-but-reasonable CVS dirs. """ if UpdatableSourceWorkingDir._willApplyChangeset(self, changeset, applyable): for m in changeset.entries: if m.action_kind == m.ADDED: self.__createParentCVSDirectories(changeset, m.name) return True else: return False def __createParentCVSDirectories(self, changeset, entry): """ Verify that the hierarchy down to the entry is under CVS. If the directory containing the entry does not exists, create it and make it appear as under CVS so that succeding 'cvs update' will work. """ from os.path import split, join, exists from os import mkdir path = split(entry)[0] if path: basedir = join(self.basedir, path) else: basedir = self.basedir cvsarea = join(basedir, 'CVS') if path and not exists(cvsarea): parentcvs = self.__createParentCVSDirectories(changeset, path) assert exists(parentcvs), "Uhm, strange things happen" if not exists(basedir): mkdir(basedir) # Create fake CVS area mkdir(cvsarea) # Create an empty "Entries" file entries = open(join(cvsarea, 'Entries'), 'w') entries.close() reposf = open(join(parentcvs, 'Repository')) rep = reposf.readline()[:-1] reposf.close() reposf = open(join(cvsarea, 'Repository'), 'w') reposf.write("%s/%s\n" % (rep, split(basedir)[1])) reposf.close() rootf = open(join(parentcvs, 'Root')) root = rootf.readline() rootf.close() rootf = open(join(cvsarea, 'Root'), 'w') rootf.write(root) rootf.close() # Add the "new" directory to the changeset, so that the # replayer get its name entry = changeset.addEntry(path, None) entry.action_kind = entry.ADDED return cvsarea ## SyncronizableTargetWorkingDir def _addPathnames(self, names): """ Add some new filesystem objects. """ cmd = [self.repository.CVS_CMD, "-q", "add"] ExternalCommand(cwd=self.basedir, command=cmd).execute(names) def __forceTagOnEachEntry(self): """ Massage each CVS/Entries file, locking (ie, tagging) each entry to its current CVS version. This is to prevent silly errors such those that could arise after a manual ``cvs update`` in the working directory. """ from os import walk, rename from os.path import join for dir, subdirs, files in walk(self.basedir): if dir[-3:] == 'CVS': efn = join(dir, 'Entries') f = open(efn) entries = f.readlines() f.close() rename(efn, efn+'.old') newentries = [] for e in entries: if e.startswith('/'): fields = e.split('/') fields[-1] = "T%s\n" % fields[2] newe = '/'.join(fields) newentries.append(newe) else: newentries.append(e) f = open(efn, 'w') f.writelines(newentries) f.close() def _getCommitEntries(self, changeset): """ Extract the names of the entries for the commit phase. Since CVS does not have a "rename" operation, this is simulated by a remove+add, and both entries must be committed. """ entries = SyncronizableTargetWorkingDir._getCommitEntries(self, changeset) entries.extend([e.old_name for e in changeset.renamedEntries()]) return entries def _commit(self, date, author, patchname, changelog=None, entries=None): """ Commit the changeset. """ from shwrap import ReopenableNamedTemporaryFile from sys import getdefaultencoding encoding = ExternalCommand.FORCE_ENCODING or getdefaultencoding() logmessage = [] if patchname: logmessage.append(patchname.encode(encoding)) if changelog: logmessage.append(changelog.replace('%', '%%').encode(encoding)) logmessage.append('') logmessage.append('Original author: %s' % author.encode(encoding)) logmessage.append('Date: %s' % date) logmessage.append('') rontf = ReopenableNamedTemporaryFile('cvs', 'tailor') log = open(rontf.name, "w") log.write('\n'.join(log)) log.close() cmd = [self.repository.CVS_CMD, "-q", "ci", "-F", rontf.name] if not entries: entries = ['.'] c = ExternalCommand(cwd=self.basedir, command=cmd) c.execute(entries) if c.exit_status: raise ChangesetApplicationFailure("%s returned status %d" % (str(c), c.exit_status)) def _removePathnames(self, names): """ Remove some filesystem objects. """ cmd = [self.repository.CVS_CMD, "-q", "remove"] ExternalCommand(cwd=self.basedir, command=cmd).execute(names) def _renamePathname(self, oldname, newname): """ Rename a filesystem object. """ self._removePathnames([oldname]) self._addPathnames([newname])