source: tailor/vcpx/shwrap.py @ 380

Revision 380, 5.1 KB checked in by lele@…, 8 years ago (diff)

Do not suggest every source is a script by itself

Line 
1# -*- mode: python; coding: iso-8859-1 -*-
2# :Progetto: vcpx -- Tiny wrapper around external command
3# :Creato:   sab 10 apr 2004 16:43:48 CEST
4# :Autore:   Lele Gaifax <lele@nautilus.homeip.net>
5# :Licenza:  GNU General Public License
6#
7
8__docformat__ = 'reStructuredText'
9
10from StringIO import StringIO
11from sys import stderr
12import threading
13
14def shrepr(str):
15    """
16    Escape an arbitrary string so that it is safe to pass it as
17    argument to a shell command.
18    """
19   
20    str = '\\\\'.join(str.split('\\'))
21    str = '\\"'.join(str.split('"'))
22    str = '\\$'.join(str.split('$'))
23    str = '\\*'.join(str.split('*'))
24    str = '\\?'.join(str.split('?'))
25    str = '\\`'.join(str.split('`'))
26    str = '\\('.join(str.split('('))
27    str = '\\)'.join(str.split(')'))
28    str = '\\['.join(str.split('['))
29    str = '\\]'.join(str.split(']'))
30   
31    return '"' + str + '"'
32
33
34class ReopenableNamedTemporaryFile:
35    """
36    This uses tempfile.mkstemp() to generate a secure temp file.  It
37    then closes the file, leaving a zero-length file as a placeholder.
38    You can get the filename with ReopenableNamedTemporaryFile.name.
39    When the ReopenableNamedTemporaryFile instance is garbage
40    collected or its shutdown() method is called, it deletes the file.
41
42    Copied from Zooko's pyutil.fileutil, http://zooko.com/repos/pyutil
43    """
44    def __init__(self, suffix=None, prefix=None, dir=None, text=None):
45        from tempfile import mkstemp
46       
47        self.name = mkstemp(suffix, prefix, dir, text)[1]
48     
49    def __del__(self):
50        self.shutdown()
51       
52    def shutdown(self):
53        from os import remove
54       
55        remove(self.name)
56
57
58class VerboseStringIO(StringIO):
59
60    def write(self, s):
61        """Give a feedback to the user."""
62       
63        StringIO.write(self, s)
64        stderr.write('.'*s.count('\n'))
65
66def joinall(threadlist):
67    for t in threadlist:
68        t.join()
69
70class SystemCommand(object):
71    """Wrap a single command to be executed by the shell."""
72
73    COMMAND = None
74    """The default command for this class.  Must be redefined by subclasses."""
75
76    VERBOSE = True
77    """Print the executed command on stderr, at each run."""
78
79    FORCE_ENCODING = None
80    """Force the output encoding to some other charset instead of user prefs."""
81   
82    def __init__(self, command=None, working_dir=None):
83        """Initialize a SystemCommand instance, specifying the command
84           to be executed and eventually the working directory."""
85       
86        self.command = command or self.COMMAND
87        """The command to be executed."""
88       
89        self.working_dir = working_dir
90        """The working directory, go there before execution."""
91       
92        self.exit_status = None
93        """Once the command has been executed, this is its exit status."""
94       
95    def __call__(self, output=None, input=None, dry_run=False, **kwargs):
96        """Execute the command."""
97       
98        from os import system, popen, popen2, wait, chdir
99        from shutil import copyfileobj
100        threadlist = []
101       
102        wdir = self.working_dir or kwargs.get('working_dir')
103        if wdir:
104            chdir(wdir)
105
106        command = self.command % kwargs
107        if self.VERBOSE:
108            stderr.write("%s " % command)
109
110        if dry_run:
111            if self.VERBOSE:
112                stderr.write(" [dry run]\n")
113            return
114       
115        if output:
116            if output is True:
117                if self.VERBOSE:
118                    output = VerboseStringIO()
119                else:
120                    output = StringIO()
121
122            if input:
123                inp, out = popen2(command)
124                def handleinp():
125                    if self.FORCE_ENCODING:
126                        inp.write(input.encode(self.FORCE_ENCODING))
127                    else:
128                        inp.write(input)
129                    inp.close()
130                inpthread = threading.Thread(target = handleinp)
131                inpthread.start()
132                threadlist.append(inpthread)
133            else:
134                out = popen(command)
135
136            def handleout():
137                copyfileobj(out, output, length=128)
138                output.seek(0)
139            outthread = threading.Thread(target = handleout)
140            outthread.start()
141            threadlist.append(outthread)
142
143            joinall(threadlist)
144
145            if input:
146                self.exit_status = wait()[1]
147                out.close()
148            else:
149                self.exit_status = out.close() or 0
150        else:
151            if input:
152                inp, out = popen2(command)
153                if self.FORCE_ENCODING:
154                    inp.write(input.encode(self.FORCE_ENCODING))
155                else:
156                    inp.write(input)
157                inp.close()
158                out.close()
159                self.exit_status = wait()[1]
160            else:
161                self.exit_status = system(command)           
162                   
163        if self.VERBOSE:
164            if not self.exit_status:
165                stderr.write(" [Ok]\n")
166            else:
167                stderr.write(" [Error %s]\n" % self.exit_status)
168               
169        return output
170
Note: See TracBrowser for help on using the repository browser.