source: tailor/vcpx/shwrap.py @ 1016

Revision 1016, 6.9 KB checked in by lele@…, 8 years ago (diff)

Use the new encode facility of the Repository

Line 
1# -*- mode: python; coding: iso-8859-1 -*-
2# :Progetto: vcpx -- Tiny wrapper around external command
3# :Creato:   sab 10 apr 2004 16:43:48 CEST
4# :Autore:   Lele Gaifax <lele@nautilus.homeip.net>
5# :Licenza:  GNU General Public License
6#
7
8__docformat__ = 'reStructuredText'
9
10try:
11    # Python 2.4
12    from subprocess import Popen, PIPE, STDOUT
13except ImportError:
14    # Older snakes
15    from _process import Popen, PIPE, STDOUT
16
17class ReopenableNamedTemporaryFile:
18    """
19    This uses tempfile.mkstemp() to generate a secure temp file.  It
20    then closes the file, leaving a zero-length file as a placeholder.
21    You can get the filename with ReopenableNamedTemporaryFile.name.
22    When the ReopenableNamedTemporaryFile instance is garbage
23    collected or its shutdown() method is called, it deletes the file.
24
25    Copied from Zooko's pyutil.fileutil, http://zooko.com/repos/pyutil
26    """
27    def __init__(self, suffix=None, prefix=None, dir=None, text=None):
28        from tempfile import mkstemp
29        from os import close
30
31        fd, self.name = mkstemp(suffix, prefix, dir, text)
32        close(fd)
33
34    def __del__(self):
35        self.shutdown()
36
37    def shutdown(self):
38        from os import remove
39
40        remove(self.name)
41
42
43class ExternalCommand:
44    """Wrap a single command to be executed by the shell."""
45
46    DEBUG = False
47    """Print the output of the command, when not PIPEd to the caller."""
48
49    DRY_RUN = False
50    """Don't really execute the command."""
51
52    def __init__(self, command=None, cwd=None):
53        """
54        Initialize a ExternalCommand instance, specifying the command
55        to be executed and eventually the working directory.
56
57        The instance will use the logger ``tailor.shell``.
58        """
59
60        from logging import getLogger
61
62        self.command = command
63        """The command to be executed."""
64
65        self.cwd = cwd
66        """The working directory, go there before execution."""
67
68        self.exit_status = None
69        """Once the command has been executed, this is its exit status."""
70
71        self._last_command = None
72        """Last executed command."""
73
74        self.log = getLogger('tailor.shell')
75
76    def __str__(self):
77        """
78        Return a string representation of the command prefixed by working dir.
79        """
80
81        r = '$'+repr(self)
82        if self.cwd:
83            r = self.cwd + ' ' + r
84        return r
85
86    def __repr__(self):
87        """
88        Compute a reasonable shell-like representation of the external command.
89        """
90
91        result = []
92        needquote = False
93        for arg in self._last_command or self.command:
94            bs_buf = []
95
96            # Add a space to separate this argument from the others
97            result.append(' ')
98
99            needquote = (" " in arg) or ("\t" in arg)
100            if needquote:
101                result.append('"')
102
103            for c in arg:
104                if c == '\\':
105                    # Don't know if we need to double yet.
106                    bs_buf.append(c)
107                elif c == '"':
108                    # Double backspaces.
109                    result.append('\\' * len(bs_buf)*2)
110                    bs_buf = []
111                    result.append('\\"')
112                else:
113                    # Normal char
114                    if bs_buf:
115                        result.extend(bs_buf)
116                        bs_buf = []
117                    result.append(c)
118
119            # Add remaining backspaces, if any.
120            if bs_buf:
121                result.extend(bs_buf)
122
123            if needquote:
124                result.extend(bs_buf)
125                result.append('"')
126
127        return ''.join(result)
128
129    def execute(self, *args, **kwargs):
130        """Execute the command."""
131
132        from sys import stderr
133        from locale import getpreferredencoding
134        from os import environ, getcwd
135        from cStringIO import StringIO
136
137        self.exit_status = None
138
139        self._last_command = [chunk % kwargs for chunk in self.command]
140        if len(args) == 1 and type(args[0]) == type([]):
141            self._last_command.extend(args[0])
142        else:
143            self._last_command.extend(args)
144
145        self.log.info(self)
146
147        if self.DRY_RUN:
148            return
149
150        if not kwargs.has_key('cwd') and self.cwd:
151            kwargs['cwd'] = self.cwd
152
153        self.log.debug("Executing %r (%r)", self, kwargs.get('cwd', getcwd()))
154
155        if not kwargs.has_key('env'):
156            env = kwargs['env'] = {}
157            env.update(environ)
158
159            for v in ['LANG', 'TZ', 'PATH']:
160                if kwargs.has_key(v):
161                    env[v] = kwargs[v]
162            # Override also LC_ALL that has a higher priority over LANG,
163            # and LC_MESSAGES as well.
164            if kwargs.has_key('LANG'):
165                env['LC_ALL'] = kwargs['LANG']
166                env['LC_MESSAGES'] = kwargs['LANG']
167
168        input = kwargs.get('input')
169        output = kwargs.get('stdout')
170        error = kwargs.get('stderr')
171
172        # When not in debug, redirect stderr and stdout to /dev/null
173        # when the caller didn't ask for them.
174        if not self.DEBUG:
175            try:
176                from os import devnull
177            except ImportError:
178                devnull = '/dev/null'
179            if output is None:
180                output = open(devnull, 'w')
181            if error is None:
182                error = open(devnull, 'w')
183        try:
184            process = Popen(self._last_command,
185                            stdin=input and PIPE or None,
186                            stdout=output,
187                            stderr=error,
188                            env=kwargs.get('env'),
189                            cwd=kwargs.get('cwd'),
190                            universal_newlines=True)
191        except OSError, e:
192            from errno import ENOENT
193
194            if e.errno == ENOENT:
195                raise OSError("'%s' does not exist!" % self._last_command[0])
196            else:
197                raise
198
199        if input and isinstance(input, unicode):
200            encoding = getpreferredencoding()
201            self.log.warning("Using default %s encoding, ignoring errors; "
202                             "caller should use repository's encoding and "
203                             "pass an already encoded input")
204            input = input.encode(encoding, 'ignore')
205
206        out, err = process.communicate(input=input)
207
208        self.exit_status = process.returncode
209        if not self.exit_status:
210            self.log.info("[Ok]")
211        else:
212            self.log.warning("[Status %s]", self.exit_status)
213
214        # For debug purposes, copy the output to our stderr when hidden above
215        if self.DEBUG:
216            if out and output == PIPE:
217                stderr.write('Output stream:\n')
218                stderr.write(out)
219            if err and error == PIPE:
220                stderr.write('Error stream:\n')
221                stderr.write(err)
222
223        if out is not None:
224            out = StringIO(out)
225        if err is not None:
226            err = StringIO(err)
227
228        return out, err
Note: See TracBrowser for help on using the repository browser.