6ea0430447def3cda2ad64d67a9b10466712fe15
[melted] / src / mvcp / mvcp_remote.c
1 /*
2 * mvcp_remote.c -- Remote Parser
3 * Copyright (C) 2002-2009 Ushodaya Enterprises Limited
4 * Author: Charles Yates <charles.yates@pandora.be>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21 /* System header files */
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <signal.h>
26 #include <errno.h>
27 #include <pthread.h>
28
29 /* Application header files */
30 #include <framework/mlt.h>
31 #include "mvcp_remote.h"
32 #include "mvcp_socket.h"
33 #include "mvcp_tokeniser.h"
34 #include "mvcp_util.h"
35
36 /** Private mvcp_remote structure.
37 */
38
39 typedef struct
40 {
41 int terminated;
42 char *server;
43 int port;
44 mvcp_socket socket;
45 mvcp_socket status;
46 pthread_t thread;
47 mvcp_parser parser;
48 pthread_mutex_t mutex;
49 int connected;
50 }
51 *mvcp_remote, mvcp_remote_t;
52
53 /** Forward declarations.
54 */
55
56 static mvcp_response mvcp_remote_connect( mvcp_remote );
57 static mvcp_response mvcp_remote_execute( mvcp_remote, char * );
58 static mvcp_response mvcp_remote_receive( mvcp_remote, char *, char * );
59 static mvcp_response mvcp_remote_push( mvcp_remote, char *, mlt_service );
60 static void mvcp_remote_close( mvcp_remote );
61 static int mvcp_remote_read_response( mvcp_socket, mvcp_response );
62
63 /** MVCP Parser constructor.
64 */
65
66 mvcp_parser mvcp_parser_init_remote( char *server, int port )
67 {
68 mvcp_parser parser = calloc( 1, sizeof( mvcp_parser_t ) );
69 mvcp_remote remote = calloc( 1, sizeof( mvcp_remote_t ) );
70
71 if ( parser != NULL )
72 {
73 parser->connect = (parser_connect)mvcp_remote_connect;
74 parser->execute = (parser_execute)mvcp_remote_execute;
75 parser->push = (parser_push)mvcp_remote_push;
76 parser->received = (parser_received)mvcp_remote_receive;
77 parser->close = (parser_close)mvcp_remote_close;
78 parser->real = remote;
79
80 if ( remote != NULL )
81 {
82 remote->parser = parser;
83 remote->server = strdup( server );
84 remote->port = port;
85 pthread_mutex_init( &remote->mutex, NULL );
86 }
87 }
88 return parser;
89 }
90
91 /** Thread for receiving and distributing the status information.
92 */
93
94 static void *mvcp_remote_status_thread( void *arg )
95 {
96 mvcp_remote remote = arg;
97 char temp[ 10240 ];
98 int length = 0;
99 int offset = 0;
100 mvcp_tokeniser tokeniser = mvcp_tokeniser_init( );
101 mvcp_notifier notifier = mvcp_parser_get_notifier( remote->parser );
102 mvcp_status_t status;
103 int index = 0;
104
105 mvcp_socket_write_data( remote->status, "STATUS\r\n", 8 );
106
107 while ( !remote->terminated &&
108 ( length = mvcp_socket_read_data( remote->status, temp + offset, sizeof( temp ) ) ) >= 0 )
109 {
110 if ( strchr( temp, '\n' ) == NULL )
111 {
112 offset = length;
113 continue;
114 }
115 offset = 0;
116 mvcp_tokeniser_parse_new( tokeniser, temp, "\n" );
117 for ( index = 0; index < mvcp_tokeniser_count( tokeniser ); index ++ )
118 {
119 char *line = mvcp_tokeniser_get_string( tokeniser, index );
120 if ( line[ strlen( line ) - 1 ] == '\r' )
121 {
122 mvcp_util_chomp( line );
123 mvcp_status_parse( &status, line );
124 mvcp_notifier_put( notifier, &status );
125 }
126 else
127 {
128 strcpy( temp, line );
129 offset = strlen( temp );
130 }
131 }
132 }
133
134 mvcp_notifier_disconnected( notifier );
135 mvcp_tokeniser_close( tokeniser );
136 remote->terminated = 1;
137
138 return NULL;
139 }
140
141 /** Forward reference.
142 */
143
144 static void mvcp_remote_disconnect( mvcp_remote remote );
145
146 /** Connect to the server.
147 */
148
149 static mvcp_response mvcp_remote_connect( mvcp_remote remote )
150 {
151 mvcp_response response = NULL;
152
153 mvcp_remote_disconnect( remote );
154
155 if ( !remote->connected )
156 {
157 signal( SIGPIPE, SIG_IGN );
158
159 remote->socket = mvcp_socket_init( remote->server, remote->port );
160 remote->status = mvcp_socket_init( remote->server, remote->port );
161
162 if ( mvcp_socket_connect( remote->socket ) == 0 )
163 {
164 response = mvcp_response_init( );
165 mvcp_remote_read_response( remote->socket, response );
166 }
167
168 if ( response != NULL && mvcp_socket_connect( remote->status ) == 0 )
169 {
170 mvcp_response status_response = mvcp_response_init( );
171 mvcp_remote_read_response( remote->status, status_response );
172 if ( mvcp_response_get_error_code( status_response ) == 100 )
173 pthread_create( &remote->thread, NULL, mvcp_remote_status_thread, remote );
174 mvcp_response_close( status_response );
175 remote->connected = 1;
176 }
177 }
178
179 return response;
180 }
181
182 /** Execute the command.
183 */
184
185 static mvcp_response mvcp_remote_execute( mvcp_remote remote, char *command )
186 {
187 mvcp_response response = NULL;
188 pthread_mutex_lock( &remote->mutex );
189 if ( mvcp_socket_write_data( remote->socket, command, strlen( command ) ) == strlen( command ) )
190 {
191 response = mvcp_response_init( );
192 mvcp_socket_write_data( remote->socket, "\r\n", 2 );
193 mvcp_remote_read_response( remote->socket, response );
194 }
195 pthread_mutex_unlock( &remote->mutex );
196 return response;
197 }
198
199 /** Push a MLT XML document to the server.
200 */
201
202 static mvcp_response mvcp_remote_receive( mvcp_remote remote, char *command, char *buffer )
203 {
204 mvcp_response response = NULL;
205 pthread_mutex_lock( &remote->mutex );
206 if ( mvcp_socket_write_data( remote->socket, command, strlen( command ) ) == strlen( command ) )
207 {
208 char temp[ 20 ];
209 int length = strlen( buffer );
210 response = mvcp_response_init( );
211 mvcp_socket_write_data( remote->socket, "\r\n", 2 );
212 sprintf( temp, "%d", length );
213 mvcp_socket_write_data( remote->socket, temp, strlen( temp ) );
214 mvcp_socket_write_data( remote->socket, "\r\n", 2 );
215 mvcp_socket_write_data( remote->socket, buffer, length );
216 mvcp_socket_write_data( remote->socket, "\r\n", 2 );
217 mvcp_remote_read_response( remote->socket, response );
218 }
219 pthread_mutex_unlock( &remote->mutex );
220 return response;
221 }
222
223 /** Push a producer to the server.
224 */
225
226 static mvcp_response mvcp_remote_push( mvcp_remote remote, char *command, mlt_service service )
227 {
228 mvcp_response response = NULL;
229 if ( service != NULL )
230 {
231 mlt_consumer consumer = mlt_factory_consumer( NULL, "xml", "buffer" );
232 mlt_properties properties = MLT_CONSUMER_PROPERTIES( consumer );
233 char *buffer = NULL;
234 // Temporary hack
235 mlt_properties_set( properties, "store", "nle_" );
236 mlt_consumer_connect( consumer, service );
237 mlt_consumer_start( consumer );
238 buffer = mlt_properties_get( properties, "buffer" );
239 response = mvcp_remote_receive( remote, command, buffer );
240 mlt_consumer_close( consumer );
241 }
242 return response;
243 }
244
245 /** Disconnect.
246 */
247
248 static void mvcp_remote_disconnect( mvcp_remote remote )
249 {
250 if ( remote != NULL && remote->terminated )
251 {
252 if ( remote->connected )
253 pthread_join( remote->thread, NULL );
254 mvcp_socket_close( remote->status );
255 mvcp_socket_close( remote->socket );
256 remote->connected = 0;
257 remote->terminated = 0;
258 }
259 }
260
261 /** Close the parser.
262 */
263
264 static void mvcp_remote_close( mvcp_remote remote )
265 {
266 if ( remote != NULL )
267 {
268 remote->terminated = 1;
269 mvcp_remote_disconnect( remote );
270 pthread_mutex_destroy( &remote->mutex );
271 free( remote->server );
272 free( remote );
273 }
274 }
275
276 /** Read response.
277 */
278
279 static int mvcp_remote_read_response( mvcp_socket socket, mvcp_response response )
280 {
281 char temp[ 10240 ];
282 int length;
283 int terminated = 0;
284
285 while ( !terminated && ( length = mvcp_socket_read_data( socket, temp, 10240 ) ) >= 0 )
286 {
287 int position = 0;
288 temp[ length ] = '\0';
289 mvcp_response_write( response, temp, length );
290 position = mvcp_response_count( response ) - 1;
291 if ( position < 0 || temp[ strlen( temp ) - 1 ] != '\n' )
292 continue;
293 switch( mvcp_response_get_error_code( response ) )
294 {
295 case 201:
296 case 500:
297 terminated = !strcmp( mvcp_response_get_line( response, position ), "" );
298 break;
299 case 202:
300 terminated = mvcp_response_count( response ) >= 2;
301 break;
302 default:
303 terminated = 1;
304 break;
305 }
306 }
307
308 return 0;
309 }