source: tailor/vcpx/shwrap.py @ 987

Revision 987, 6.8 KB checked in by lele@…, 8 years ago (diff)

Override also LC_ALL when using LANG=C
Tailor needs to parse the output of the external commands, so it
uses LANG=C when needed. Since LC_ALL has an higher priority than
LANG, it needs to be overridden as well.

Line 
1# -*- mode: python; coding: iso-8859-1 -*-
2# :Progetto: vcpx -- Tiny wrapper around external command
3# :Creato:   sab 10 apr 2004 16:43:48 CEST
4# :Autore:   Lele Gaifax <lele@nautilus.homeip.net>
5# :Licenza:  GNU General Public License
6#
7
8__docformat__ = 'reStructuredText'
9
10try:
11    # Python 2.4
12    from subprocess import Popen, PIPE, STDOUT
13except ImportError:
14    # Older snakes
15    from _process import Popen, PIPE, STDOUT
16
17class ReopenableNamedTemporaryFile:
18    """
19    This uses tempfile.mkstemp() to generate a secure temp file.  It
20    then closes the file, leaving a zero-length file as a placeholder.
21    You can get the filename with ReopenableNamedTemporaryFile.name.
22    When the ReopenableNamedTemporaryFile instance is garbage
23    collected or its shutdown() method is called, it deletes the file.
24
25    Copied from Zooko's pyutil.fileutil, http://zooko.com/repos/pyutil
26    """
27    def __init__(self, suffix=None, prefix=None, dir=None, text=None):
28        from tempfile import mkstemp
29        from os import close
30
31        fd, self.name = mkstemp(suffix, prefix, dir, text)
32        close(fd)
33
34    def __del__(self):
35        self.shutdown()
36
37    def shutdown(self):
38        from os import remove
39
40        remove(self.name)
41
42
43class ExternalCommand:
44    """Wrap a single command to be executed by the shell."""
45
46    DEBUG = False
47    """Print the output of the command, when not PIPEd to the caller."""
48
49    DRY_RUN = False
50    """Don't really execute the command."""
51
52    FORCE_ENCODING = None
53    """Force the output encoding to some other charset instead of user prefs."""
54
55    def __init__(self, command=None, cwd=None):
56        """
57        Initialize a ExternalCommand instance, specifying the command
58        to be executed and eventually the working directory.
59
60        The instance will use the logger ``tailor.shell``.
61        """
62
63        from logging import getLogger
64
65        self.command = command
66        """The command to be executed."""
67
68        self.cwd = cwd
69        """The working directory, go there before execution."""
70
71        self.exit_status = None
72        """Once the command has been executed, this is its exit status."""
73
74        self._last_command = None
75        """Last executed command."""
76
77        self.log = getLogger('tailor.shell')
78
79    def __str__(self):
80        """
81        Return a string representation of the command prefixed by working dir.
82        """
83
84        r = '$'+repr(self)
85        if self.cwd:
86            r = self.cwd + ' ' + r
87        return r
88
89    def __repr__(self):
90        """
91        Compute a reasonable shell-like representation of the external command.
92        """
93
94        result = []
95        needquote = False
96        for arg in self._last_command or self.command:
97            bs_buf = []
98
99            # Add a space to separate this argument from the others
100            result.append(' ')
101
102            needquote = (" " in arg) or ("\t" in arg)
103            if needquote:
104                result.append('"')
105
106            for c in arg:
107                if c == '\\':
108                    # Don't know if we need to double yet.
109                    bs_buf.append(c)
110                elif c == '"':
111                    # Double backspaces.
112                    result.append('\\' * len(bs_buf)*2)
113                    bs_buf = []
114                    result.append('\\"')
115                else:
116                    # Normal char
117                    if bs_buf:
118                        result.extend(bs_buf)
119                        bs_buf = []
120                    result.append(c)
121
122            # Add remaining backspaces, if any.
123            if bs_buf:
124                result.extend(bs_buf)
125
126            if needquote:
127                result.extend(bs_buf)
128                result.append('"')
129
130        return ''.join(result)
131
132    def execute(self, *args, **kwargs):
133        """Execute the command."""
134
135        from sys import stderr
136        from locale import getpreferredencoding
137        from os import environ, getcwd
138        from cStringIO import StringIO
139
140        self.exit_status = None
141
142        self._last_command = [chunk % kwargs for chunk in self.command]
143        if len(args) == 1 and type(args[0]) == type([]):
144            self._last_command.extend(args[0])
145        else:
146            self._last_command.extend(args)
147
148        self.log.info(self)
149
150        if self.DRY_RUN:
151            return
152
153        if not kwargs.has_key('cwd') and self.cwd:
154            kwargs['cwd'] = self.cwd
155
156        self.log.debug("Executing %r (%r)", self, kwargs.get('cwd', getcwd()))
157
158        if not kwargs.has_key('env'):
159            env = kwargs['env'] = {}
160            env.update(environ)
161
162            for v in ['LANG', 'TZ', 'PATH']:
163                if kwargs.has_key(v):
164                    env[v] = kwargs[v]
165            # Override also LC_ALL that has a higher priority over LANG,
166            # and LC_MESSAGES as well.
167            if kwargs.has_key('LANG'):
168                env['LC_ALL'] = kwargs['LANG']
169                env['LC_MESSAGES'] = kwargs['LANG']
170
171        input = kwargs.get('input')
172        output = kwargs.get('stdout')
173        error = kwargs.get('stderr')
174
175        # When not in debug, redirect stderr and stdout to /dev/null
176        # when the caller didn't ask for them.
177        if not self.DEBUG:
178            try:
179                from os import devnull
180            except ImportError:
181                devnull = '/dev/null'
182            if output is None:
183                output = open(devnull, 'w')
184            if error is None:
185                error = open(devnull, 'w')
186        try:
187            process = Popen(self._last_command,
188                            stdin=input and PIPE or None,
189                            stdout=output,
190                            stderr=error,
191                            env=kwargs.get('env'),
192                            cwd=kwargs.get('cwd'),
193                            universal_newlines=True)
194        except OSError, e:
195            from errno import ENOENT
196
197            if e.errno == ENOENT:
198                raise OSError("'%s' does not exist!" % self._last_command[0])
199            else:
200                raise
201
202        if input and isinstance(input, unicode):
203            input = input.encode(self.FORCE_ENCODING or getpreferredencoding())
204
205        out, err = process.communicate(input=input)
206
207        self.exit_status = process.returncode
208        if not self.exit_status:
209            self.log.info("[Ok]")
210        else:
211            self.log.warning("[Status %s]", self.exit_status)
212
213        # For debug purposes, copy the output to our stderr when hidden above
214        if self.DEBUG:
215            if out and output == PIPE:
216                stderr.write('Output stream:\n')
217                stderr.write(out)
218            if err and error == PIPE:
219                stderr.write('Error stream:\n')
220                stderr.write(err)
221
222        if out is not None:
223            out = StringIO(out)
224        if err is not None:
225            err = StringIO(err)
226
227        return out, err
Note: See TracBrowser for help on using the repository browser.