source: tailor/vcpx/darcs.py @ 99

Revision 99, 9.7 KB checked in by lele@…, 9 years ago (diff)

Refix _getUpstreamChangesets for darcs

Line 
1#! /usr/bin/python
2# -*- mode: python; coding: utf-8 -*-
3# :Progetto: vcpx -- Darcs details
4# :Creato:   ven 18 giu 2004 14:45:28 CEST
5# :Autore:   Lele Gaifax <lele@nautilus.homeip.net>
6#
7
8"""
9This module contains supporting classes for the `darcs` versioning system.
10"""
11
12__docformat__ = 'reStructuredText'
13
14from shwrap import SystemCommand
15from source import UpdatableSourceWorkingDir
16from target import SyncronizableTargetWorkingDir, TargetInitializationFailure
17
18
19class DarcsInitialize(SystemCommand):
20    COMMAND = "darcs initialize"
21
22
23class DarcsRecord(SystemCommand):
24    COMMAND = "darcs record --all --pipe --look-for-adds %(entries)s"
25
26    def __call__(self, output=None, dry_run=False, **kwargs):
27        date = kwargs.get('date').strftime('%Y/%m/%d %H:%M:%S')
28        author = kwargs.get('author')
29        patchname = kwargs.get('patchname')
30        logmessage = kwargs.get('logmessage')
31        if not logmessage:
32            logmessage = ''
33
34        input = "%s\n%s\n%s\n%s\n" % (date, author, patchname, logmessage)
35       
36        return SystemCommand.__call__(self, output=output, input=input,
37                                      dry_run=dry_run, 
38                                      **kwargs)
39
40
41class DarcsMv(SystemCommand):
42    COMMAND = "darcs mv --standard-verbosity %(old)s %(new)s"
43
44
45class DarcsRemove(SystemCommand):
46    COMMAND = "darcs remove --standard-verbosity %(entry)s"
47
48
49class DarcsAdd(SystemCommand):
50    COMMAND = "darcs add --not-recursive --standard-verbosity %(entry)s"
51
52
53class DarcsTag(SystemCommand):
54    COMMAND = "darcs tag --standard-verbosity --patch-name='%(tagname)s'"
55
56
57class DarcsChanges(SystemCommand):
58    COMMAND = "darcs changes --from-patch='%(patch)s' --xml-output --summary"
59
60
61class DarcsAnnotate(SystemCommand):
62    COMMAND = "darcs annotate --standard-verbosity %(entry)s >/dev/null 2>&1"
63   
64class DarcsWorkingDir(UpdatableSourceWorkingDir,SyncronizableTargetWorkingDir):
65    """
66    A working directory under ``darcs``.
67    """
68
69    def __init__(self):
70        self.__visitedDirs = {}
71       
72    ## UpdatableSourceWorkingDir
73   
74    def _getUpstreamChangesets(self, root, sincerev=None):
75        """
76        Do the actual work of fetching the upstream changeset.
77       
78        This is different from the other VC mechanisms: here we want
79        register with the target the changes we submitted to this
80        repository to be sent back to upstream. Since we may want to
81        regroup the various patchsets into a single one, we first
82        manually pull here what we wanna send, then the sync will replay
83        the changes of all new changesets.
84       
85        So, here we actually list the changes after the last tag, not
86        those pending on the other side.
87        """
88
89        c = DarcsChanges(working_dir=root)
90        changes = c(output=True, patch=sincerev)
91
92        changesets = self.__parseDarcsChanges(changes)
93
94        # sort changeset by date
95        changesets.sort(lambda x, y: cmp(x.date, y.date))
96
97        return changesets
98   
99    def __parseDarcsChanges(self, changes):
100        from xml.sax import parseString
101        from xml.sax.handler import ContentHandler
102        from changes import ChangesetEntry, Changeset
103        from datetime import datetime
104       
105        class DarcsXMLChangesHandler(ContentHandler):
106            def __init__(self):
107                self.changesets = []
108                self.current = None
109                self.current_field = []
110
111            def startElement(self, name, attributes):
112                if name == 'patch':
113                    self.current = {}
114                    self.current['author'] = attributes['author']
115                    date = attributes['date']
116                    # 20040619130027
117                    y = int(date[:4])
118                    m = int(date[4:6])
119                    d = int(date[6:8])
120                    hh = int(date[8:10])
121                    mm = int(date[10:12])
122                    ss = int(date[12:14])
123                    timestamp = datetime(y, m, d, hh, mm, ss)
124                    self.current['date'] = timestamp
125                    self.current['revision'] = attributes['revision']
126                    self.current['entries'] = []
127                elif name in ['name', 'comment',
128                              'add_file', 'add_directory',
129                              'modify_file', 'remove_file']:
130                    self.current_field = []
131                elif name == 'path':
132                    self.current_field = []
133                    if attributes.has_key('copyfrom-path'):
134                        self.current_path_action = (
135                            attributes['action'],
136                            attributes['copyfrom-path'][1:], # make it relative
137                            attributes['copyfrom-rev'])
138                    else:
139                        self.current_path_action = attributes['action']
140                elif name == 'move':
141                    self.old_name = attributes['from']
142                    self.new_name = attributes['to']
143                   
144            def endElement(self, name):
145                if name == 'patch':
146                    # Sort the paths to make tests easier
147                    self.current['entries'].sort()
148                    self.changesets.append(Changeset(self.current['name'],
149                                                     self.current['date'],
150                                                     self.current['author'],
151                                                     self.current['comment'],
152                                                     self.current['entries']))
153                    self.current = None
154                elif name in ['name', 'comment']:
155                    self.current[name] = ''.join(self.current_field)
156                elif name == 'move':
157                    entry = ChangesetEntry(self.new_name)
158                    entry.action_kind = entry.RENAMED
159                    entry.old_name = self.old_name
160                    self.current['entries'].append(entry)
161                elif name in ['add_file', 'add_directory',
162                              'modify_file', 'remove_file']:
163                    entry = ChangesetEntry(''.join(self.current_field))
164                    entry.action_kind = { 'add_file': entry.ADDED,
165                                          'add_directory': entry.ADDED,
166                                          'modify_file': entry.UPDATED,
167                                          'remove_file': entry.DELETED,
168                                          'rename_file': entry.RENAMED
169                                        }[name]
170
171                    self.current['entries'].append(entry)
172
173            def characters(self, data):
174                self.current_field.append(data)
175       
176        handler = DarcsXMLChangesHandler()
177        parseString(changes.getvalue(), handler)
178        return handler.changesets
179       
180    def _applyChangeset(self, root, changeset, logger=None):
181        """
182        Do the actual work of applying the changeset to the working copy.
183
184        The changeset is already applied, so this is a do nothing method.
185        """
186
187        return
188   
189    ## SyncronizableTargetWorkingDir
190
191    def _addEntry(self, root, entry):
192        """
193        Add a new entry, maybe registering the directory as well.
194        """
195
196        from os.path import split, join, exists
197
198        basedir = split(entry)[0]
199
200        if basedir:
201            if not self.__visitedDirs.get(basedir):
202                # This is ugly, but I didn't find a better way to test
203                # whether a particular directory is already version
204                # controlled by darcs.
205       
206                dannot = DarcsAnnotate(working_dir=root)
207       
208                dannot(entry=basedir)
209                if dannot.exit_status:
210                    self._addEntry(root, basedir)
211
212                self.__visitedDirs[basedir] = True
213               
214        c = DarcsAdd(working_dir=root)
215        c(entry=entry)
216
217       
218    def _commit(self,root, date, author, remark, changelog=None, entries=None):
219        """
220        Commit the changeset.
221        """
222
223        c = DarcsRecord(working_dir=root)
224
225        if entries:
226            entries = ' '.join(entries)
227        else:
228            entries = '.'
229           
230        c(output=True, date=date, patchname=remark,
231          logmessage=changelog, author=author, entries=entries)
232       
233    def _removeEntry(self, root, entry):
234        """
235        Remove an entry.
236        """
237
238        c = DarcsRemove(working_dir=root)
239        c(entry=entry)
240
241    def _renameEntry(self, root, oldentry, newentry):
242        """
243        Rename an entry.
244        """
245
246        c = DarcsMv(working_dir=root)
247        c(old=oldentry, new=newentry)
248
249    def _createTag(self, root, tagname):
250        """
251        Tag the current situation and remember this as the *last tag*.
252        """
253
254        from os.path import join, exists
255       
256        c = DarcsTag(working_dir=root)
257        c(output=True, tagname=tagname)
258       
259        fname = join(root, '_darcs', 'last-sync-tag')
260        f = open(fname, 'w')
261        f.write(tagname)
262        f.close()
263       
264    def _getLastTag(self, root):
265        """
266        Return the name of the last emitted tag, if any, otherwise None.
267        """
268       
269        from os.path import join, exists
270       
271        fname = join(root, '_darcs', 'last-sync-tag')
272        if exists(fname):
273            f = open(fname)
274            tagname = f.read()
275            f.close()
276           
277            return tagname
278
279    def _initializeWorkingDir(self, root, module):
280        """
281        Execute `darcs initialize`.
282        """
283       
284        c = DarcsInitialize(working_dir=root)
285        c(output=True)
286
287        if c.exit_status:
288            raise TargetInitializationFailure(
289                "'darcs initialize' returned status %s" % c.exit_status)
290
Note: See TracBrowser for help on using the repository browser.