Changeset 503 in tailor for vcpx/cvsps.py
- Timestamp:
- 08/05/05 12:23:23 (8 years ago)
- Hash name:
- 20050805102323-97f81-4de4aa10a473c22b5148200047138b58575375b3
- File:
-
- 1 edited
-
vcpx/cvsps.py (modified) (28 diffs)
Legend:
- Unmodified
- Added
- Removed
-
vcpx/cvsps.py
r467 r503 4 4 # :Autore: Lele Gaifax <lele@nautilus.homeip.net> 5 5 # :Licenza: GNU General Public License 6 # 6 # 7 7 8 8 """ … … 20 20 21 21 CVS_CMD = 'cvs' 22 CVSPS_CMD = 'cvsps' 22 CVSPS_CMD = 'cvsps' 23 23 24 24 def changesets_from_cvsps(log, sincerev=None): … … 30 30 from datetime import datetime 31 31 from cvs import compare_cvs_revs 32 32 33 33 # cvsps output sample: 34 34 ## --------------------- … … 140 140 141 141 ## UpdatableSourceWorkingDir 142 142 143 143 def _getUpstreamChangesets(self, root, repository, module, sincerev=None, 144 144 branch=None): … … 155 155 if sincerev: 156 156 sincerev = int(sincerev) 157 157 158 158 changesets = [] 159 159 cmd = [CVSPS_CMD, "--cvs-direct", "-u", "-b", branch, … … 161 161 cvsps = ExternalCommand(command=cmd) 162 162 log = cvsps.execute(module, stdout=PIPE, TZ='UTC') 163 163 164 164 for cs in changesets_from_cvsps(log, sincerev): 165 165 changesets.append(cs) … … 170 170 from os.path import join, exists 171 171 from os import listdir 172 172 173 173 if not entrydir: 174 174 return … … 178 178 deldir = changeset.addEntry(entrydir, None) 179 179 deldir.action_kind = deldir.DELETED 180 180 181 181 def _applyChangeset(self, root, changeset, logger=None): 182 182 from os.path import join, exists, dirname, split … … 185 185 from cvs import CvsEntries 186 186 from time import sleep 187 187 188 188 entries = CvsEntries(root) 189 189 … … 219 219 # remove it with everything it contains (that should be 220 220 # just a single "CVS" subdir, btw) 221 221 222 222 if e.action_kind == e.DELETED and e.new_revision is None: 223 223 assert listdir(join(root, e.name)) == ['CVS'], '%s should be empty' % e.name … … 229 229 while True: 230 230 cvsup.execute(e.name, stdout=PIPE) 231 231 232 232 if cvsup.exit_status: 233 233 retry += 1 … … 243 243 else: 244 244 break 245 245 246 246 if cvsup.exit_status: 247 247 raise ChangesetApplicationFailure( … … 289 289 if timestamp == 'INITIAL': 290 290 timestamp = csets[-1].date.isoformat(sep=' ') 291 291 292 292 if not exists(join(wdir, 'CVS')): 293 293 cmd = [CVS_CMD, "-q", "-d", repository, "checkout", … … 297 297 if timestamp: 298 298 cmd.extend(["-D", "%s UTC" % timestamp]) 299 299 300 300 checkout = ExternalCommand(cwd=basedir, command=cmd) 301 301 checkout.execute(module) 302 302 303 303 if checkout.exit_status: 304 304 raise TargetInitializationFailure( … … 307 307 else: 308 308 if logger: logger.info("Using existing %s", wdir) 309 309 310 310 self.__forceTagOnEachEntry(wdir) 311 311 312 312 entries = CvsEntries(wdir) 313 313 314 314 # update cvsps cache, then loop over the changesets and find the 315 315 # last applied, to find out the actual cvsps revision … … 324 324 if not found: 325 325 break 326 326 327 327 if found: 328 328 last = cset … … 336 336 if logger: logger.info("working copy up to cvsps revision %s", 337 337 last.revision) 338 338 339 339 return last 340 340 341 341 def _willApplyChangeset(self, root, changeset, applyable=None): 342 342 """ 343 343 This gets called just before applying each changeset. 344 344 345 345 Since CVS has no "createdir" event, we have to take care 346 346 of new directories, creating empty-but-reasonable CVS dirs. … … 352 352 if m.action_kind == m.ADDED: 353 353 self.__createParentCVSDirectories(changeset, root, m.name) 354 354 355 355 return True 356 356 else: 357 357 return False 358 358 359 359 def __createParentCVSDirectories(self, changeset, root, entry): 360 360 """ … … 365 365 'cvs update' will work. 366 366 """ 367 367 368 368 from os.path import split, join, exists 369 369 from os import mkdir … … 373 373 basedir = join(root, path) 374 374 else: 375 basedir = root 375 basedir = root 376 376 cvsarea = join(basedir, 'CVS') 377 377 378 378 if path and not exists(cvsarea): 379 379 parentcvs = self.__createParentCVSDirectories(changeset, … … 381 381 382 382 assert exists(parentcvs), "Uhm, strange things happen" 383 383 384 384 if not exists(basedir): 385 385 mkdir(basedir) … … 413 413 entry = changeset.addEntry(path, None) 414 414 entry.action_kind = entry.ADDED 415 415 416 416 return cvsarea 417 417 418 418 ## SyncronizableTargetWorkingDir 419 419 … … 434 434 after a manual ``cvs update`` in the working directory. 435 435 """ 436 436 437 437 from os import walk, rename 438 438 from os.path import join … … 445 445 f.close() 446 446 rename(efn, efn+'.old') 447 447 448 448 newentries = [] 449 449 for e in entries: … … 459 459 f.writelines(newentries) 460 460 f.close() 461 461 462 462 def _getCommitEntries(self, changeset): 463 463 """ … … 472 472 473 473 return entries 474 474 475 475 def _commit(self,root, date, author, remark, changelog=None, entries=None): 476 476 """ … … 480 480 from shwrap import ReopenableNamedTemporaryFile 481 481 from sys import getdefaultencoding 482 482 483 483 encoding = ExternalCommand.FORCE_ENCODING or getdefaultencoding() 484 484 485 485 logmessage = [] 486 486 if remark: … … 496 496 log = open(rontf.name, "w") 497 497 log.write('\n'.join(log)) 498 log.close() 498 log.close() 499 499 500 500 cmd = [CVS_CMD, "-q", "ci", "-F", rontf.name] 501 501 if not entries: 502 502 entries = ['.'] 503 503 504 504 ExternalCommand(cwd=root, command=cmd).execute(entries) 505 505 506 506 def _removePathnames(self, root, names): 507 507 """
Note: See TracChangeset
for help on using the changeset viewer.
