Consumer valerie, pushes, and assorted modifications
[melted] / src / valerie / valerie_remote.c
1 /*
2 * valerie_remote.c -- Remote Parser
3 * Copyright (C) 2002-2003 Ushodaya Enterprises Limited
4 * Author: Charles Yates <charles.yates@pandora.be>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21 /* System header files */
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <signal.h>
26 #include <errno.h>
27 #include <pthread.h>
28
29 /* Application header files */
30 #include <framework/mlt.h>
31 #include "valerie_remote.h"
32 #include "valerie_socket.h"
33 #include "valerie_tokeniser.h"
34 #include "valerie_util.h"
35
36 /** Private valerie_remote structure.
37 */
38
39 typedef struct
40 {
41 int terminated;
42 char *server;
43 int port;
44 valerie_socket socket;
45 valerie_socket status;
46 pthread_t thread;
47 valerie_parser parser;
48 pthread_mutex_t mutex;
49 int connected;
50 }
51 *valerie_remote, valerie_remote_t;
52
53 /** Forward declarations.
54 */
55
56 static valerie_response valerie_remote_connect( valerie_remote );
57 static valerie_response valerie_remote_execute( valerie_remote, char * );
58 static valerie_response valerie_remote_push( valerie_remote, char *, mlt_service );
59 static void valerie_remote_close( valerie_remote );
60 static int valerie_remote_read_response( valerie_socket, valerie_response );
61
62 /** DV Parser constructor.
63 */
64
65 valerie_parser valerie_parser_init_remote( char *server, int port )
66 {
67 valerie_parser parser = calloc( 1, sizeof( valerie_parser_t ) );
68 valerie_remote remote = calloc( 1, sizeof( valerie_remote_t ) );
69
70 if ( parser != NULL )
71 {
72 parser->connect = (parser_connect)valerie_remote_connect;
73 parser->execute = (parser_execute)valerie_remote_execute;
74 parser->push = (parser_push)valerie_remote_push;
75 parser->close = (parser_close)valerie_remote_close;
76 parser->real = remote;
77
78 if ( remote != NULL )
79 {
80 remote->parser = parser;
81 remote->server = strdup( server );
82 remote->port = port;
83 pthread_mutex_init( &remote->mutex, NULL );
84 }
85 }
86 return parser;
87 }
88
89 /** Thread for receiving and distributing the status information.
90 */
91
92 static void *valerie_remote_status_thread( void *arg )
93 {
94 valerie_remote remote = arg;
95 char temp[ 10240 ];
96 int length = 0;
97 int offset = 0;
98 valerie_tokeniser tokeniser = valerie_tokeniser_init( );
99 valerie_notifier notifier = valerie_parser_get_notifier( remote->parser );
100 valerie_status_t status;
101 int index = 0;
102
103 valerie_socket_write_data( remote->status, "STATUS\r\n", 8 );
104
105 while ( !remote->terminated &&
106 ( length = valerie_socket_read_data( remote->status, temp + offset, sizeof( temp ) ) ) >= 0 )
107 {
108 if ( strchr( temp, '\n' ) == NULL )
109 {
110 offset = length;
111 continue;
112 }
113 offset = 0;
114 valerie_tokeniser_parse_new( tokeniser, temp, "\n" );
115 for ( index = 0; index < valerie_tokeniser_count( tokeniser ); index ++ )
116 {
117 char *line = valerie_tokeniser_get_string( tokeniser, index );
118 if ( line[ strlen( line ) - 1 ] == '\r' )
119 {
120 valerie_util_chomp( line );
121 valerie_status_parse( &status, line );
122 valerie_notifier_put( notifier, &status );
123 }
124 else
125 {
126 strcpy( temp, line );
127 offset = strlen( temp );
128 }
129 }
130 }
131
132 valerie_notifier_disconnected( notifier );
133 valerie_tokeniser_close( tokeniser );
134 remote->terminated = 1;
135
136 return NULL;
137 }
138
139 /** Forward reference.
140 */
141
142 static void valerie_remote_disconnect( valerie_remote remote );
143
144 /** Connect to the server.
145 */
146
147 static valerie_response valerie_remote_connect( valerie_remote remote )
148 {
149 valerie_response response = NULL;
150
151 valerie_remote_disconnect( remote );
152
153 if ( !remote->connected )
154 {
155 signal( SIGPIPE, SIG_IGN );
156
157 remote->socket = valerie_socket_init( remote->server, remote->port );
158 remote->status = valerie_socket_init( remote->server, remote->port );
159
160 if ( valerie_socket_connect( remote->socket ) == 0 )
161 {
162 response = valerie_response_init( );
163 valerie_remote_read_response( remote->socket, response );
164 }
165
166 if ( response != NULL && valerie_socket_connect( remote->status ) == 0 )
167 {
168 valerie_response status_response = valerie_response_init( );
169 valerie_remote_read_response( remote->status, status_response );
170 if ( valerie_response_get_error_code( status_response ) == 100 )
171 pthread_create( &remote->thread, NULL, valerie_remote_status_thread, remote );
172 valerie_response_close( status_response );
173 remote->connected = 1;
174 }
175 }
176
177 return response;
178 }
179
180 /** Execute the command.
181 */
182
183 static valerie_response valerie_remote_execute( valerie_remote remote, char *command )
184 {
185 valerie_response response = NULL;
186 pthread_mutex_lock( &remote->mutex );
187 if ( valerie_socket_write_data( remote->socket, command, strlen( command ) ) == strlen( command ) )
188 {
189 response = valerie_response_init( );
190 valerie_socket_write_data( remote->socket, "\r\n", 2 );
191 valerie_remote_read_response( remote->socket, response );
192 }
193 pthread_mutex_unlock( &remote->mutex );
194 return response;
195 }
196
197 /** Push a producer to the server.
198 */
199
200 static valerie_response valerie_remote_push( valerie_remote remote, char *command, mlt_service service )
201 {
202 valerie_response response = NULL;
203 pthread_mutex_lock( &remote->mutex );
204 if ( valerie_socket_write_data( remote->socket, command, strlen( command ) ) == strlen( command ) )
205 {
206 mlt_consumer consumer = mlt_factory_consumer( "westley", "buffer" );
207 mlt_properties properties = mlt_consumer_properties( consumer );
208 char temp[ 20 ];
209 char *buffer = NULL;
210 int length = 0;
211 mlt_consumer_connect( consumer, service );
212 response = valerie_response_init( );
213 valerie_socket_write_data( remote->socket, "\r\n", 2 );
214 mlt_consumer_start( consumer );
215 buffer = mlt_properties_get_data( properties, "buffer", &length );
216 sprintf( temp, "%d", length );
217 valerie_socket_write_data( remote->socket, temp, strlen( temp ) );
218 valerie_socket_write_data( remote->socket, "\r\n", 2 );
219 valerie_socket_write_data( remote->socket, buffer, length );
220 valerie_socket_write_data( remote->socket, "\r\n", 2 );
221 valerie_remote_read_response( remote->socket, response );
222 mlt_consumer_close( consumer );
223 }
224 pthread_mutex_unlock( &remote->mutex );
225 return response;
226 }
227
228 /** Disconnect.
229 */
230
231 static void valerie_remote_disconnect( valerie_remote remote )
232 {
233 if ( remote != NULL && remote->terminated )
234 {
235 if ( remote->connected )
236 pthread_join( remote->thread, NULL );
237 valerie_socket_close( remote->status );
238 valerie_socket_close( remote->socket );
239 remote->connected = 0;
240 remote->terminated = 0;
241 }
242 }
243
244 /** Close the parser.
245 */
246
247 static void valerie_remote_close( valerie_remote remote )
248 {
249 if ( remote != NULL )
250 {
251 remote->terminated = 1;
252 valerie_remote_disconnect( remote );
253 pthread_mutex_destroy( &remote->mutex );
254 free( remote->server );
255 free( remote );
256 }
257 }
258
259 /** Read response.
260 */
261
262 static int valerie_remote_read_response( valerie_socket socket, valerie_response response )
263 {
264 char temp[ 10240 ];
265 int length;
266 int terminated = 0;
267
268 while ( !terminated && ( length = valerie_socket_read_data( socket, temp, 10240 ) ) >= 0 )
269 {
270 int position = 0;
271 temp[ length ] = '\0';
272 valerie_response_write( response, temp, length );
273 position = valerie_response_count( response ) - 1;
274 if ( position < 0 || temp[ strlen( temp ) - 1 ] != '\n' )
275 continue;
276 switch( valerie_response_get_error_code( response ) )
277 {
278 case 201:
279 case 500:
280 terminated = !strcmp( valerie_response_get_line( response, position ), "" );
281 break;
282 case 202:
283 terminated = valerie_response_count( response ) >= 2;
284 break;
285 default:
286 terminated = 1;
287 break;
288 }
289 }
290
291 return 0;
292 }